Abstract: This paper describes how to use the DeltaStreamer tool of Hudi to bring data into the lake in real time.

This article is shared from Huawei FusionInsight MRS Real-Hudi Real-time Access to Lake DeltaStreamer Tool Best Practices in huawei Cloud community by Jin Hongqing.

background

The organizational structure of traditional big data platform is designed for offline data processing requirements. The common data import method is to adopt SQOOP periodic job batch import. With the increasing demand of real-time data analysis, data synchronization by hour or even minute level is becoming more and more common. Therefore, the development of a (quasi-) real-time synchronization system based on Spark/Flink stream processing mechanism is developed.

However, real-time synchronization from the beginning faced several challenges:

  • Small file problem. In spark’s microbatch mode or Flink’s one-by-one processing mode, files of several MB or even tens of KB are written to the HDFS each time. A large number of small files are generated over a long period of time, resulting in great pressure on HDFS Namenode.
  • Support for update operations. The HDFS system does not support data modification. Therefore, records cannot be modified during synchronization.
  • Transactional. How to ensure transaction, whether it is to append data or modify data. Data is written to the HDFS only once during the commit operation of the stream handler. When the program is rollback, the data that has been written or partially written can be deleted.

Hudi is one solution to this problem. Use the DeltaStreamer tool delivered with Hudi to write data to Hudi and enable- enable-hive-sync to synchronize data to hive tables.

Introduction to Hudi DeltaStreamer writing tool

DeltaStreamer tools use reference hudi.apache.org/cn/docs/wri…

The HoodieDeltaStreamer utility (part of the Hudi-Utilities-Bundle) provides a way to ingest from different sources such as DFS or Kafka and has the following features.

  • Ingestion of new events single time from Kafka, from Sqoop, HiveIncrementalPuller output, or multiple files in a DFS folder
  • Support for incoming data of JSON, AVro, or custom record types
  • Manage checkpoints, rollback, and recovery
  • Avro schema using the DFS or Confluent Schema registry.
  • Support custom transformation operations

The scene that

  1. Production library data is recorded in real time by CDC tool (Debezium) into the specific topic of Kafka in MRS cluster.
  2. Using the DeltaStreamer tool provided by Hudi, data is read and parsed from a specific Kafka topic.
  3. The DeltaStreamer tool is used to write the processed data to the HIVE of the MRS cluster.

Introduction to sample data

MySQL > create MySQL database

Introduction to CDC tool Debezium

Docking procedure specific reference: fusioninsight. Making. IO/ecosystem/z…

After interconnection, add, modify, and delete kafka messages corresponding to the MySQL production library

Increase operation: insert into hudi. Hudisource3 values (11, “Jiang Yutang”, “38”, “female”, “map”, “player”, “28732”);

Corresponding kafka message body:

UPDATE hudi.hudisource3 SET uname= ‘Anne Marie333’ WHERE uid=11;

Corresponding kafka message body:

Delete from hudi.hudisource3 where uid=11;

Corresponding kafka message body:

Commissioning procedures

Obtain huawei MRS Hudi sample project

According to the actual MRS version, log on to Github to get sample code: github.com/huaweicloud…

Open the project SparkOnHudiJavaExample

Example Code modification and introduction

1.debeziumJsonParser

The message body of debezium is parsed to obtain the OP field.

The source code is as follows:

package com.huawei.bigdata.hudi.examples; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; public class debeziumJsonParser { public static String getOP(String message){ JSONObject json_obj = JSON.parseObject(message); String op = json_obj.getJSONObject("payload").get("op").toString(); return op; }}Copy the code

2.MyJsonKafkaSource

Description: DeltaStreamer default org. Apache. Hudi. Utilities. Sources. Kafka specified topic JsonKafkaSource consumption data, if consumer stage involves data parsing operations, MyJsonKafkaSource needs to be overwritten for processing.

The following is the source code, add comments

package com.huawei.bigdata.hudi.examples; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; import java.util.Map; /** * Read json kafka data. */ public class MyJsonKafkaSource extends JsonSource { private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class); private final KafkaOffsetGen offsetGen; private final HoodieDeltaStreamerMetrics metrics; public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(properties, sparkContext, sparkSession, schemaProvider); HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder(); this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build()); properties.put("key.deserializer", StringDeserializer.class); properties.put("value.deserializer", StringDeserializer.class); offsetGen = new KafkaOffsetGen(properties); } @Override protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) { OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); if (totalNewMsgs <= 0) { return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); } JavaRDD<String> newDataRDD = toRDD(offsetRanges); return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) { return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies. PreferConsistent ()). The filter ((x) - > {/ / filter empty lines and dirty data String MSG = (String) x.v alue (); if (msg == null) { return false; } try{ String op = debeziumJsonParser.getOP(msg); }catch (Exception e){ return false; } return true; }).map((x) -> {// parse the debezium data into the map, return tostring of the map, so that the structure change is minimal String MSG = (String)x.value(); String op = debeziumJsonParser.getOP(msg); JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField); Boolean is_delete = false; String out_str = ""; Object out_obj = new Object(); if(op.equals("c")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else if(op.equals("u")){ out_obj = json_obj.getJSONObject("payload").get("after"); } else { is_delete = true; out_obj = json_obj.getJSONObject("payload").get("before"); } Map out_map = (Map)out_obj; out_map.put("_hoodie_is_deleted",is_delete); out_map.put("op",op); return out_map.toString(); }); }}Copy the code

3.TransformerExample

Note: Fields that need to be specified when entering a hudI or Hive table

The following is the source code, add comments

package com.huawei.bigdata.hudi.examples; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.transform.Transformer; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.io.Serializable; import java.util.ArrayList; import java.util.List; Format */ public class TransformerExample Implements Transformer, Serializable { /** * format data * * @param JavaSparkContext jsc * @param SparkSession sparkSession * @param Dataset<Row> rowDataset * @param TypedProperties properties * @return Dataset<Row> */ @Override public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) { JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD(); List<Row> rowList = new ArrayList<>(); for (Row row : rowJavaRdd.collect()) { Row one_row = buildRow(row); rowList.add(one_row); } JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList); List<StructField> fields = new ArrayList<>(); builFields(fields); StructType schema = DataTypes.createStructType(fields); Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema); return dataFrame; } private void builFields(List<StructField> fields) { fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true)); fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("age", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true)); fields.add(DataTypes.createStructField("op", DataTypes.StringType, true)); } private Row buildRow(Row row) { Integer uid = row.getInt(0); String uname = row.getString(1); String age = row.getString(2); String sex = row.getString(3); String mostlike = row.getString(4); String lastview = row.getString(5); String totalcost = row.getString(6); Boolean _hoodie_is_deleted = row.getBoolean(7); String op = row.getString(8); Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op); return returnRow; }}Copy the code

4.DataSchemaProviderExample

Note: Specify that MyJsonKafkaSource returns data in source Schema and TransformerExample writes data in Target Schema

Here is the source code

package com.huawei.bigdata.hudi.examples; import org.apache.avro.Schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaSparkContext; / functional description * * * * to provide those and target schema * / public class DataSchemaProviderExample extends SchemaProvider {public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); } /** * source schema * * @return Schema */ @Override public Schema getSourceSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"t ype\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\ "type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\ "_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; } /** * target schema * * @return Schema */ @Override public Schema getTargetSchema() { Schema avroSchema = new Schema.Parser().parse( "{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\": \"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"st ring\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\", \"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}"); return avroSchema; }}Copy the code

Upload the project package (Hudi-security-examples -0.7.0.jar) and JSON parsing package (fastjson-1.2.4.jar) to the MRS client

DeltaStreamer Start command

Log in to the client and run the following command to obtain environment variables and authentication

source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env
Copy the code

DeltaStreamer starts with the following command:

spark-submit --master yarn-client \ --jars / opt/hudi - demo2 / fastjson - 1. The jar, / opt/hudi demo2 / hudi ws-security - examples - 0.7.0. Jar \ - driver - class - the path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo 2 / hudi ws-security - examples - 0.7.0. Jar \ - class org. Apache. Hudi. Utilities. Deltastreamer. HoodieDeltaStreamer \ spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \ --target-base-path /tmp/huditest/delta_demo2 \ --table-type COPY_ON_WRITE \ --target-table delta_demo2 \ --source-ordering-field uid \ --source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \ --schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \ --transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \ --enable-hive-sync --continuousCopy the code

Kafka. Configuration properties

/ / hudi configuration hoodie. The datasource. Write. Recordkey. Field = uid hoodie. The datasource. Write. Partitionpath. Field = hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.datasource.write.hive_style_partitioning=true hoodie.delete.shuffle.parallelism=10 hoodie.upsert.shuffle.parallelism=10 hoodie.bulkinsert.shuffle.parallelism=10 hoodie.insert.shuffle.parallelism=10 hoodie.finalize.write.parallelism=10 hoodie.cleaner.parallelism=10 hoodie.datasource.write.precombine.field=uid hoodie.base.path = /tmp/huditest/delta_demo2 hoodie.timeline.layout.version = 1 // hive config hoodie.datasource.hive_sync.table=delta_demo2 hoodie.datasource.hive_sync.partition_fields= hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor hoodie.datasource.hive_sync.use_jdbc=false // Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudisource // checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/ // Kafka props The bootstrap. The servers = 172.16.9.117:21005 auto. Offset.. reset = earliest group. The id = a5 offset.. reaching the limit = 10000Copy the code

Note: kafka server configuration allow. Everyone. If. No. Acl. Found to true

Querying information using Spark

spark-shell --master yarn

val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from  hudi_ro_table").show()
Copy the code

Mysql Add operation corresponding to spark HUDI table query result:

Mysql update operation corresponding to the HudI table in Spark query result:

Delete operation:

Using Hive Queries

beeline

select * from delta_demo2;
Copy the code

Mysql > add operation to hive table

Mysql > select * from ‘hive’ where ‘update’ = ‘hive’;

Mysql > delete from hive;

Click follow to learn about the fresh technologies of Huawei Cloud