This article mainly introduces how to accurately and efficiently enter DB data into Hive data warehouse from two aspects of Binlog real-time collection and offline processing of Binlog service data restoration.

background

In Data warehouse modeling, raw business layer Data without any processing is called ODS(Operational Data Store) Data. In Internet enterprises, common ODS data includes service Log data and service DB data. Collecting service DB data from relational databases such as MySQL and importing it to Hive is an important step in data warehouse production. How to accurately and efficiently synchronize MySQL data to Hive? The common solution is batch fetch and Load: directly connect MySQL to Select table data, save the data to local files as intermediate storage, and Load the files to Hive table. The advantage of this scheme is that it is simple to implement, but with the development of business, the disadvantages are gradually exposed:

  • Performance bottleneck: With the growth of service scale, Select From MySQL -> Save to Localfile -> Load to Hive takes longer and longer time, which cannot meet the time requirements of downstream warehouse production.
  • Select a large amount of data from MySQL directly, which has a serious impact on MySQL, causing slow query and affecting normal services on service lines.
  • Hive syntax does not support SQL primitives such as Update and Delete. Hive does support SQL primitives such as Update and Delete, but requires buckets and ORC storage. Hive does not support Update/Delete data in MySQL.

To solve these problems thoroughly, we gradually turn to the technical solution of CDC (Change Data Capture) + Merge, that is, real-time Binlog collection + offline Binlog recovery of service Data. The Binlog is a binary log of the MySQL database, which records all data changes in the MySQL database. The primary/secondary synchronization of the MySQL cluster is based on the Binlog.

Implementation approach

First, Flink is responsible for pulling Binlog data from Kafka to HDFS.

Then, for each ODS table, you need to create a Snapshot first and read the amount of data in MySQL to Hive. In this process, directly connect MySQL to Select data. You can use Sqoop to import all data in one time.

Finally, Merge each ODS table daily based on the existing data and the Binlog generated by the daily increment to restore the business data.

Binlog is generated by streaming. Through real-time collection of Binlog, part of the data processing requirements are allocated to the real-time stream from the batch processing once a day. There will be a significant improvement in both performance and MySQL access pressure. The Binlog itself records the types of data changes (Insert/Update/Delete). With some semantic processing, it is perfectly possible to restore the data accurately.

Implementation scheme

Flink handles Kafka’s binlog log

Use Kafka source to parse the read data using JSON and concatenate the parsed fields into strings in accordance with Hive Schema format. The code is as follows:

package com.etl.kafka2hdfs;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/ * * *@Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/27
 *  @Time: 12:52 * * /
public class HdfsSink {
    public static void main(String[] args) throws Exception {
        String fieldDelimiter = ",";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // checkpoint
        env.enableCheckpointing(10 _000);
        //env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
        env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
        CheckpointConfig config = env.getCheckpointConfig();
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        // source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers"."kms-2:9092,kms-3:9092,kms-4:9092");
        // only required for Kafka 0.8
        props.setProperty("zookeeper.connect"."kms-2:2181,kms-3:2181,kms-4:2181");
        props.setProperty("group.id"."test123");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "qfbap_ods.code_city".new SimpleStringSchema(), props);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        // transform
        SingleOutputStreamOperator<String> cityDS = stream
                .filter(new FilterFunction<String>() {
                    // Filter out DDL operations
                    @Override
                    public boolean filter(String jsonVal) throws Exception {
                        JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
                        return record.getString("isDdl").equals("false");
                    }
                })
                .map(new MapFunction<String, String>() {

                    @Override
                    public String map(String value) throws Exception {
                        StringBuilder fieldsBuilder = new StringBuilder();
                        // Parse JSON data
                        JSONObject record = JSON.parseObject(value, Feature.OrderedField);
                        // Get the latest field value
                        JSONArray data = record.getJSONArray("data");
                        // Traversal, JSON array of field values, only one element
                        for (int i = 0; i < data.size(); i++) {
                            // Get the ith element of the JSON array
                            JSONObject obj = data.getJSONObject(i);
                            if(obj ! =null) {
                                fieldsBuilder.append(record.getLong("id")); / / serial number id
                                fieldsBuilder.append(fieldDelimiter); // Field separator
                                fieldsBuilder.append(record.getLong("es")); // Business timestamp
                                fieldsBuilder.append(fieldDelimiter);
                                fieldsBuilder.append(record.getLong("ts")); // Log timestamp
                                fieldsBuilder.append(fieldDelimiter);
                                fieldsBuilder.append(record.getString("type")); // Operation type
                                for (Map.Entry<String, Object> entry : obj.entrySet()) {

                                    fieldsBuilder.append(fieldDelimiter);
                                    fieldsBuilder.append(entry.getValue()); // Table field data}}}returnfieldsBuilder.toString(); }});//cityDS.print();
        //stream.print();

        // sink
        // If one of the following conditions is met, a new file will be scrolled
        RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
                .withRolloverInterval(60L * 1000L) // Scroll the time to write a new file. Default: 60s. Adjust according to the specific situation
                .withMaxPartSize(1024 * 1024 * 128L) // Set the maximum size for each file. The default is 128MB
                .withInactivityInterval(60L * 1000L) // Default 60 seconds, unwritten data in inactive state timeout will scroll new files
                .build();
        
        StreamingFileSink<String> sink = StreamingFileSink
                //.forRowFormat(new Path("file:///E://binlog_db/city"), new SimpleStringEncoder<String>())
                .forRowFormat(new Path("hdfs://kms-1:8020/binlog_db/code_city_delta"), new SimpleStringEncoder<String>())
                .withBucketAssigner(new EventTimeBucketAssigner())
                .withRollingPolicy(rollingPolicy)
                .withBucketCheckInterval(1000)  // Bucket check interval is set to 1S.build(); cityDS.addSink(sink); env.execute(); }}Copy the code

For Flink Sink to HDFS, StreamingFileSink replaces BucketingSink to store upstream data to different directories in HDFS. Its core logic is bucket partitioning. The default bucket partitioning method is DateTimeBucketAssigner, that is, bucket partitioning according to processing time. Processing time refers to the time it takes for a message to arrive in the Flink program, which is not what we want. Therefore, we need to write our own code to parse the event time from the message body, according to the name of the regular component bucket, the specific code is as follows:

package com.etl.kafka2hdfs;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;

/ * * *@Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/27
 *  @Time: "* * /

public class EventTimeBucketAssigner implements BucketAssigner<String.String> {

    @Override
    public String getBucketId(String element, Context context) {
        String partitionValue;
        try {
            partitionValue = getPartitionValue(element);
        } catch (Exception e) {
            partitionValue = "00000000";
        }
        return "dt=" + partitionValue;// Partition directory name
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer(a) {
        return SimpleVersionedStringSerializer.INSTANCE;
    }
    private String getPartitionValue(String element) throws Exception {

        // Fetch the es field value of the last concatenated string, which is the business time
        long eventTime = Long.parseLong(element.split(",") [1]);
        Date eventDate = new Date(eventTime);
        return new SimpleDateFormat("yyyyMMdd").format(eventDate); }}Copy the code

Restore MySQL data offline

After the preceding steps, you can write Binlog records to the corresponding partition of the HDFS. Then you need to restore the latest data based on the incremental data and the existing data. Hive tables are stored in HDFS, a file system that does not support modification, so we need some extra work to write data changes. Common methods include JOIN, Hive transaction, HBase, and Kudu.

For example, yesterday’s stock data code_city and today’s increment data code_city_delta can be combined into a new data table by FULL OUTER JOIN and used as tomorrow’s stock data:

INSERT OVERWRITE TABLE code_city
SELECT 
        COALESCE( t2.id, t1.id ) AS id,
        COALESCE ( t2.city, t1.city ) AS city,
        COALESCE ( t2.province, t1.province ) AS province,
        COALESCE ( t2.event_time, t1.event_time ) AS event_time 
FROM
        code_city t1
        FULL OUTER JOIN (
SELECT
        id,
        city,
        province,
        event_time 
FROM
        (Take the last state data
SELECT
        id,
        city,
        province,
        dml_type,
        event_time,
        row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank 
FROM
        code_city_delta 
WHERE
        dt = '20200324' Partition data
        ) temp 
WHERE
        rank = 1 
        ) t2 ON t1.id = t2.id;
Copy the code

summary

This paper mainly introduces the realization of real-time ETL through Flink from two aspects of Binlog stream collection and ODS data restoration based on Binlog. In addition, Binlog logs can be written into NoSQL which supports transaction operation such as Kudu and HBase, so that the steps of data table restoration can be omitted. This article is the second part of “Implementing Real-time incremental Data Synchronization based on Canal and Flink”. For the implementation steps of Canal parsing Binlog writing kafka, see “Implementing Real-time incremental Data Synchronization based on Canal and Flink”.

Refrence:

[1] tech.meituan.com/2018/12/06/… Real-time Incremental Data Synchronization Based on Canal and Flink