[TOC]

Real-time data processing and real-time data

Flink and Spark Streaming are used for real-time processing of real-time data. The data requires real-time processing and the processing must be fast. The scenario where the data is not real-time and the processing is not timely is our data warehouse T+1 data

The Apache Hudi scenario discussed in this article is real-time data, not real-time processing. It aims to map time in Mysql to big data platforms such as Hive in near real time.

Business scenario and technology selection

The traditional off-line data warehouse usually has T+1 data, which cannot meet the demand of daily data analysis. However, streaming computing is generally based on Windows, and the window logic is relatively fixed. My company has a special need for business analysts who are familiar with the data structure of existing transaction databases and want a lot of ad-hoc analysis that includes real-time data of the day. Usually they are based on Mysql slave library, directly through Sql to do the corresponding analysis calculation. But many times, there are the following obstacles

  • When the amount of data is large and the analysis logic is complex, it takes a long time for Mysql to retrieve data from the database
  • Some cross-library analysis is not possible

As a result, several technical frameworks emerge that bridge the gap between OLTP and OLAP, typically TiDB. It supports both OLTP and OLAP. Apache Hudi and Apache Kudu serve as Bridges between existing OLTP and OLAP technologies. They can store data in existing OLTP data structures, support CRUD, and provide integration with existing OLAP frameworks (e.g. Hive, Impala) for OLAP analysis

Apache Kudu requires a separate cluster deployment. Apache Hudi does not need to use existing big data clusters such as HDFS to store data files, and then use Hive to do data analysis. It is relatively suitable for resource-constrained environments

Use Aapche Hudi holistic thinking

Hudi provides the concept of Hudi tables that support CRUD operations. We can use Hive to query and analyze Hudi tables by replaying Mysql Binlog data to Hudi tables. The data flow architecture is as follows

Hudi table data structure

Data files of Hudi tables can be stored in the file system of the operating system or distributed file system such as HDFS. To analyze performance and data reliability, HDFS is generally used for storage. In terms of HDFS storage, the storage files of a Hudi table are divided into two types.

  • contains_partition_keyThe relevant path is the actual data file, stored by partition, of course the partition path key can be specified, I used _partition_key
  • Hoodie Because CRUD is fragmented, each operation generates a file. When more and more small files are generated, HDFS performance is seriously affected. Hudi designs a file merging mechanism. Log files related to file merge operations are stored in the. Hoodie folder.
The data file

The actual Hudi data files are stored in the Parquet file format

The hoodie file

Hudi refers to a series of CRUD operations on tables over time as Timeline. An operation in the Timeline is called Instant. Instant contains the following information

  • Instant Action Records whether the Action is a data COMMITS, a file consolidation, or a file cleanup.
  • Instant Time Indicates the Time when the operation occurs
  • State Indicates the state of an operation, REQUESTED, INFLIGHT, or COMPLETED.

The hoodie folder stores the status records of the corresponding operations

Hudi record Id

Hudi needs to be able to uniquely identify a record in order to implement CRUD of data. Hudi will combine the unique field (record key) of the data set with the partitionPath of the data as the unique key of the data

COW and MOR

Based on the above basic concepts, Hudi provides two types of table COW and MOR. There are some differences in data write and query performance

Copy On Write Table

Hereinafter referred to as a COW. As the name suggests, it makes a copy of the data as it is written and adds new data to it. A request to read data is reading a near-complete copy, similar to the idea of Mysql’s MVCC.

In the figure above, each color contains all the data up to its date. Old data copies will be deleted when they exceed a certain number limit. For this type of table, there is no Compact Instant because it is already compact when written.

  • Advantages Only one data file of the corresponding partition can be read, which is efficient
  • Disadvantages During data writing, you need to make a copy of the original data and create a new data file based on the copy. This process is time-consuming. In addition, due to time consuming, the data read by a read request lags behind
Merge On Read Table

Referred to as “MOR. The newly inserted data is stored in the Delta log. Periodically merge delta logs into parquet data files. When the data is read, the delta log is merged with the old data file to return the complete data. Of course, like COW tables, MOR tables can ignore the delta log and only read the most recent complete data file. The following figure illustrates the two ways in which MOR reads and writes data

  • Advantages Because the delta log is written first and the delta log is small, the write cost is low
  • Disadvantages: Periodically consolidate and compact; otherwise, there are many fragmented files. Read performance is poor because the Delta log and old data files need to be merged

Hudi based code implementation

I have placed a Hudi based wrapper on Github with the corresponding source address github.com/wanqiufeng/…

Binlog data is written to the Hudi table

  • The binlog-consumer branch uses Spark Streaming to consume the binlog data in Kafka and writes to the Hudi table. The binlog in Kafka is pulled synchronously through Ali’s Canal tool. The program entry is CanalKafkaImport2Hudi, which provides a set of parameters that configure the execution behavior of the program
Parameter names meaning If required The default value
--base-save-path Hudi table stored in the basis of HDFS path, such as HDFS: / / 192.168.16.181:8020 / hudi_data / is There is no
--mapping-mysql-db-name Specifies the name of the Mysql library to process is There is no
--mapping-mysql-table-name Specifies the name of the Mysql table to process is There is no
--store-table-name Specifies the table name for Hudi no By default, –mapping-mysql-db-name and –mapping-mysql-table-name are automatically generated. Suppose –mapping-mysql-db-name is CRM and –mapping-mysql-table-name is order. Then the final HUDI table is named crm__order
--real-save-path Specifies the HDFS path where the HUDI table is ultimately stored no By default, it is automatically generated based on –base-save-path and –store-table-name. The generated format is ‘–base-save-path’+’/’+’–store-table-name’. The default value is recommended
--primary-key Specify a field name that uniquely identifies the record in the synchronized mysql table no The default id
--partition-key Specifies the time field in the mysql table that can be used for partitioning. The field must be of type TIMESTAMP or dateime is There is no
--precombine-key Ultimately used to configure HUDIhoodie.datasource.write.precombine.field no The default id
--kafka-server Specify the Kafka cluster address is There is no
--kafka-topic Specifies the queue to consume Kafka is There is no
--kafka-group Specifies the group that consumes Kafka no Default ‘hudi’ prefix to storage table names, such as ‘hudi_crm__order’
--duration-seconds Because this program is developed using Spark Streaming, the duration of Spark Streaming microbatch is specified here no The default 10 seconds

A working demo is shown below

/ data/opt/spark - 2.4.4 - bin - hadoop2.6 / bin/spark - submit - class com. Niceshot. Hudi. CanalKafkaImport2Hudi \ -- the name hudi__goods \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1  \ --num-executors 1 \ --queue hudi \ --conf spark.executor.memoryOverhead=2048 \ --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \ --conf spark.core.connection.ack.wait.timeout=300 \ --conf spark.locality.wait=100 \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.receiver.maxRate=500 \ --conf spark.streaming.kafka.maxRatePerPartition=200 \ --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=10 \ --conf spark.ui.retainedTasks=10 \ --conf spark.worker.ui.retainedExecutors=10 \ --conf spark.worker.ui.retainedDrivers=10 \ --conf spark.sql.ui.retainedExecutions=10 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=20 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ / data/opt/spark - applications/hudi_canal_consumer/hudi - canal - import - 1.0 - the SNAPSHOT - jar - with - dependencies. Jar, kafka - server Local: 9092 -- kafka - topic dt_streaming_canal_xxx - base - save - path HDFS: / / 192.168.2.1:8020 / hudi_table / --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200Copy the code

Synchronize historical data and table metadata to Hive

The history_import_AND_meta_SYNC branch provides operations to synchronize historical data to HUDI tables and hudI table data structures to Hive Meta

Synchronize historical data to HUDI tables

The idea here is

  • Import all mysql data to the Hive table by injecting tools such as SQoop.
  • The data is then imported into the Hudi table using the tool HiveImport2HudiConfig in the branching code

HiveImport2HudiConfig provides the following parameters to configure the program execution behavior

Parameter names meaning If required The default value
--base-save-path Hudi table stored in the basis of HDFS path, such as HDFS: / / 192.168.16.181:8020 / hudi_data / is There is no
--mapping-mysql-db-name Specifies the name of the Mysql library to process is There is no
--mapping-mysql-table-name Specifies the name of the Mysql table to process is There is no
--store-table-name Specifies the table name for Hudi no By default, –mapping-mysql-db-name and –mapping-mysql-table-name are automatically generated. Suppose –mapping-mysql-db-name is CRM and –mapping-mysql-table-name is order. Then the final HUDI table is named crm__order
--real-save-path Specifies the HDFS path where the HUDI table is ultimately stored no By default, it is automatically generated based on –base-save-path and –store-table-name. The generated format is ‘–base-save-path’+’/’+’–store-table-name’. The default value is recommended
--primary-key Specify the field name that uniquely identifies the record in the synchronized Hive history table no The default id
--partition-key Specify the time field that can be used for partitioning in the Hive history table. The field must be of timestamp or dateime type is There is no
--precombine-key Ultimately used to configure HUDIhoodie.datasource.write.precombine.field no The default id
--sync-hive-db-name Name of the Hive library where all historical data resides is There is no
--sync-hive-table-name Name of the hive table where all historical data resides is There is no
--hive-base-path For details about the addresses for storing all hive data files, see hive configuration no /user/hive/warehouse
--hive-site-path Hive-site. XML Address of the configuration file is There is no
--tmp-data-path Path for storing temporary files during program execution. The default path is/TMP. If the disk where/TMP resides is too small, historical programs may fail to be executed. In this case, you can use this parameter to customize the execution path no Default operating system temporary directory

A program executes the demo

Nohup java-jar hdi-learn-1.0-snapshot. jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info - base - save - path HDFS: / / 192.168.2.2:8020 / hudi_table / - mapping - mysql db - name CRM - mapping - mysql - table - the name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &Copy the code
Synchronize HUDI table structures to Hive Meta

Hudi data structures and partitions need to be synchronized to Hive Meta in the form of Hive appearance so that Hive can sense HUDI data and query and analyze hudI data using SQL. Hudi can synchronize table metadata information to Hive when consuming binlogs for storage. However, Hive Meta needs to be read and written for every piece of data written to the Apache Hudi table, which may affect Hive performance significantly. So I developed a separate HiveMetaSyncConfig tool to synchronize HUDI table metadata to Hive. Considering that the program currently only supports daily partitioning, the synchronization tool can be executed once a day. Set the parameters as follows:

Parameter names meaning If required The default value
--hive-db-name Specifies which Hive database to synchronize hudI tables to is There is no
--hive-table-name Specify the hive table to which the HUDI table is synchronized is There is no
--hive-jdbc-url Specify the hive meta JDBC links, such as JDBC: hive2: / / 192.168.16.181:10000 is There is no
--hive-user-name Specify the link user name of the Hive meta no The default hive
--hive-pwd Specify the link password of the Hive meta no The default hive
--hudi-table-path Specify the HDFS file path where the HUDI table resides is There is no
--hive-site-path Specify the hive-site. XML path for hive is There is no

A program executes the demo

Java-jar hdi-learning-1.0-snapshot. jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive - the hive - PWD hive - hive JDBC - the JDBC url: hive2: / / 192.168.16.181:10000 - hudi - table - path HDFS: / / 192.168.16.181:8020 / hudi_table/crm__order - hive - site - path/lib/hive/conf/hive - site. XMLCopy the code

Some hit the pit

Hive Configuration

Some of the hive cluster hive. Input. The format configuration, the default is org.apache.hadoop.hive.ql.io.Com bineHiveInputFormat, This causes the Hive facade to mount Hudi data to read all of Hudi’s Parquet data, resulting in duplicate final read results. Need to hive format instead of org. Apache. Hadoop. Hive. Ql. IO. HiveInputFormat, in order to avoid the whole cluster level changes impact on the rest of the off-line hive Sql unnecessary, Recommended only for current hive session set set hive. Input. The format = org.. Apache hadoop. Hive. Ql. IO. HiveInputFormat;

Some tuning of Spark Streaming

Since binlog writes Hudi tables based on Spark Streaming implementation, here are some configurations at Spark and Spark Streaming level, which can make the whole program work more stable

configuration meaning
spark.streaming.backpressure.enabled=true This configuration enables Spark Streaming consumption rate to be adjusted based on the last consumption to avoid program crashes
spark.ui.retainedJobs=10

spark.ui.retainedStages=10

spark.ui.retainedTasks=10

spark.worker.ui.retainedExecutors=10

spark.worker.ui.retainedDrivers=10

spark.sql.ui.retainedExecutions=10
By default, Spark stores historical information about stages and tasks during the execution of Spark programs in the driver. If the memory of the driver is too small, the driver may crash. Set the preceding parameters to adjust the number of historical data stores to reduce the use of the inner layer
spark.yarn.maxAppAttempts=4 Set the number of times that the driver tries to restart after it crashes
spark.yarn.am.attemptFailuresValidityInterval=1h If the driver crashes only once a week, we would prefer to be restarted every time. This configuration is used to reset the maxAppAttempts interval before the driver is restarted after four total restarts
spark.yarn.max.executor.failures=20 Executor execution may fail. After the failure, the cluster automatically allocates a new executor. This parameter is used to set the maximum number of executor failures allowed. Max number of Executor failures (400) reached) and exits
spark.yarn.executor.failuresValidityInterval=1h Specifies the interval for resetting the number of executor failed reassignments
spark.task.maxFailures=8 Number of allowed task failures

Future improvement

  • Non-partitioned, or non-date partitioned tables are supported. Currently only date partitioned tables are supported
  • Multi-data type support. Currently, for the stability of the program, all fields in Mysql will be stored as String type to Hudi

The resources

hudi.apache.org/

Welcome to follow my personal account “North by Northwest UP”, recording code life, industry thinking, technology comments