This article discusses the technical feasibility of real-time Hive writing based on the latest version of Flink1.12 for the change Data Capture (CDC) scenario design. The following is a local IDEA program example for reference.

First, the overall idea

Change Data Capture (CDC) is a technology that captures the changed data in a database in real time, processes it (or does not need to process it), and updates it to the target. In order to realize real-time capture, this paper introduces Debezium as database connector. It provides connectors for MongoDB, MySQL, PostgreSQL, SQL Server, Oracle, Db2, Cassandra, Vitess (Oracle, Db2, Cassandra, Vitess connectors are being incubated) and other databases. Full data, stock data, and incremental data of various databases are sent out through Kafka Topic. In the process of data processing, kafka Cunsumer is created and the corresponding topic is subscribed to obtain the data in the topic. In addition, Flink also provides technical support for Flink SQL CDC (from Alibaba technical team, including Chong), but it is not provided in Flink 1.12 release. Support for MySQL, PostgreSQL and other databases is expected in version 1.13.

When deBezium is first started, it scans the full table and sends schema information as well as full data, then captures change information (incremental data) in real time to ensure data consistency between the source and target. Before sending the full data, first send the schema information of the database, table, etc through the history topic, and create a topic for each table of the database named topic.

Table structure information captured by Debezium

Data captured by Debezium

Second, introduce dependencies

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - Java < / artifactId > < version > 1.12.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. The flink < / groupId > < artifactId > flink - streaming - java_2. 11 < / artifactId > < version > 1.12.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - kafka_2. 11 < / artifactId > < version > 1.12.0 < / version > < scope > compile < / scope > < / dependency > <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> < version > 2.7.5 9.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - sequence - the file < / artifactId > < version > 1.12.0 < / version > < / dependency > < the dependency > < the groupId > org. Apache. Flink < / groupId > < artifactId > flink - table - API - Java - bridge_2. 11 < / artifactId > < version > 1.12.0 < / version > < / dependency > < the dependency > < groupId > org. Apache. The flink < / groupId > < artifactId > flink - table - planner - blink_2. 11 < / artifactId > <version>1.12.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.12.0</version> <scope>compile</scope> </dependency> <dependency> < the groupId > org. Apache. Flink < / groupId > < artifactId > flink - json < / artifactId > < version > 1.12.0 < / version > < scope > compile < / scope >  </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> < version > 8.0.22 < / version > < / dependency > < the dependency > < groupId > org. Apache. Bahir < / groupId > < artifactId > flink - connector - redis_2. 11 < / artifactId > < version > 1.0 < / version > < / dependency > < the dependency > < the groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - hive_2. 11 < / artifactId > < version > 1.12.0 < / version > <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> < artifactId > flink - table - API - Java - bridge_2. 11 < / artifactId > < version > 1.12.0 < / version > < scope > compile < / scope > < / dependency > < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - streaming - scala_2. 11 < / artifactId > <version>1.12.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.0</version> <scope>compile</scope> </dependency> <dependency> < groupId > org, apache hadoop < / groupId > < artifactId > hadoop - common < / artifactId > < version > 3.3.0 < / version > < / dependency > < the dependency > < groupId > org, apache hadoop < / groupId > < artifactId > hadoop - client < / artifactId > < version > 3.3.0 < / version > </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> < version > 3.1.2 < / version > < / dependency > < the dependency > < groupId > org. Apache. Hive < / groupId > < artifactId > hive - metastore < / artifactId > < version > 3.1.2 < / version > < / dependency > < the dependency > < the groupId > org. Apache. Hive < / groupId > < artifactId > hive - exec < / artifactId > < version > 3.1.2 < / version > < scope > compile < / scope > < / dependency > < the dependency > < groupId > org. Apache. The flink < / groupId > < artifactId > flink - table - API - Java - bridge_2. 11 < / artifactId > <version>1.12.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type> </dependency> <dependency> < the groupId > com. Fasterxml. Jackson. Core < / groupId > < artifactId > Jackson - databind < / artifactId > < version > 2.9.1 < / version > </dependency> <! -- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> The < artifactId > fastjson < / artifactId > < version > 1.2.75 < / version > < / dependency > < / dependencies >Copy the code

Create an execution environment

streamEnv.setParallelism(1);
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.enableCheckpointing(60000);
​
EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, envSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1));
Copy the code

4. Register the Hive Catalog

Register hive catalog for reading and writing Hive

String defaultDatabase = "test"; // Default database name String hiveConfDir = "D:\\"; // hive-site. XML path String version = "3.1.2"; HiveCatalog HiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir,version); HiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir,version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name);Copy the code

Connect to the Kafka data source

properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
/ / create kafka cunsumer
FlinkKafkaConsumer<ObjectNode> flinkKafkaConsumer =new FlinkKafkaConsumer
      
       ("topic", new JSONKeyValueDeserializationSchema(true), properties)
      
flinkKafkaConsumer.setStartFromEarliest();     // start from the earliest record possible
// Add kafka Cunsumer to the datasource
DataStream
      
        stream
      =streamEnv.addSource(flinkKafkaConsumer);
Copy the code

Vi. Business related codes

TypeInformation[] types = {Types.STRING, Types.STRING, Types.STRING}; // The following is the business related code, Do not open with SingleOutputStreamOperator < Row > mapedStream = dataStreamSource. The map (new GetIncDataMap (), new RowTypeInfo (types, fieldNames)); / / flink flow into table tableEnv. CreateTemporaryView (" kafkaRow, "mapedStream);Copy the code

7. Execute specific SQL

Insert kafka flow tables into Hive

tableEnv.executeSql("DROP TABLE IF EXISTS hivelogtab").print();
tableEnv.executeSql("CREATE TABLE hivelogtab ( id STRING,log STRING,op STRING)").print();
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
//        CloseableIterator<Row> result = tableEnv.sqlQuery("SELECT id,log,op FROM  kafkaRow").execute().collect();
//        while(result.hasNext()){
//            System.out.println(result.next());
//        }
TableResult tableResult = tableEnv.executeSql("INSERT INTO  hiveCatalog.test.hivelogtab SELECT id,log,op FROM  kafkaRow");        
streamEnv.execute("job");
Copy the code

Eight, test,

Mysql source data

Data synchronized from the Hive target

The title

Note: Hive is a data warehouse for data analysis and does not support UPDATE and DELETE operations. However, in the CDC scenario, it is not guaranteed that all operations on the source database are INSERTS. Therefore, you can perform the following operations (refer to the architecture and practice of the Meituan database platform) :

Pictures from the network, infringement deletion

TO DO :

Automatic resolution of DDL statements in schema information (including data type and keyword difference resolution between source and target)

Follow the official account, add the author’s wechat, discuss more together.