Apache Hudi is an open-source data lake framework developed by Uber. It entered the Apache incubator in January 2019 and graduated to Apache’s top level project in May of the following year. It is one of the most popular data lake frameworks.

1. Why decouple

Hudi has been using Spark as its data processing engine since its inception. If users want to use Hudi as their data lake framework, they must include Spark in their platform technology stack. Just a few years ago, using Spark as a big data processing engine was common, even natural. Spark can be used for both batch processing and microbatch flow simulation. The flow and batch can be integrated into one engine to solve both flow and batch problems. However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people’s vision and occupied a certain market in the field of computing engine. Big data processing engine is no longer the dominant one. In the big data technology community, forum and other territories, Hudi support the use of Flink computing engine voice began to emerge gradually, and become more frequent. So enabling Hudi to support the Flink engine is a valuable proposition, and integrating the Flink engine requires decoupling of Hudi from Spark.

At the same time, all mature, active and vibrant frameworks in the field of big data are elegantly designed and can be integrated with other frameworks to make use of each other’s strengths and strengths. Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework undoubtedly creates more possibilities for the integration of Hudi with other components, enabling Hudi to better integrate into the big data ecosystem.

2. Difficulty of decoupling

Hudi’s internal use of the Spark API is as common as our List development. Spark RDD is used everywhere as the main data structure, from data source reading to table writing. Even common tool classes are implemented using Spark API. It can be said that Hudi is a universal data lake framework implemented with Spark. Its connection to Spark is deep in the bone.

In addition, the primary engine for integration after decoupling is Flink. Flink and Spark are very different in core abstractions. Spark considers data to be bounded, and its core abstraction is a finite set of data. Flink believes that the essence of data is flow, and its core abstraction DataStream contains various operations on data. At the same time, Hudi also operates multiple RDD in multiple places at the same time, and processes the results of one RDD in a joint manner with another RDD. Such differences in abstraction and the reuse of intermediate results in implementation. This makes it difficult for Hudi to operate both RDD and DataStream using a uniform API in terms of decoupling abstractions.

3. Decouple ideas

In theory, Hudi uses Spark as its computing engine simply to take advantage of Spark’s distributed computing power and RDD’s rich operator capabilities. Apart from distributed computing capabilities, Hudi abstracts RDD more as a data structure, and RDD is essentially a bounded data set, so it is theoretically feasible to replace RDD with a List (although some performance might be sacrificed, of course). To ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the bounded data set as the basic unit of operation, Hudi main operation API unchanged, extract RDD as a generic, Spark engine implementation still uses RDD, other engines use List or other bounded data set as necessary.

Decoupling principle:

1) Unify generics. The JavaRDD,JavaRDD, and JavaRDD used by the Spark API are replaced by the generic I,K, and O types.

2) Despark. All apis of the abstraction layer must be Spark independent. For specific operations that are difficult to be implemented at the abstraction layer, the method should be abstract and Spark subclass should be introduced.

Such as: JavaSparkContext#map() method is used in many parts of Hudi. To de-spark, JavaSparkContext needs to be hidden. For this problem, we introduce HoodieEngineContext#map() method. This method hides the implementation details of the map so that the implementation is abstracted to de-spark.

3) Minimize changes in the abstraction layer to ensure the original functions and performance of Hudi;

4) Replace JavaSparkContext with HoodieEngineContext abstract class to provide runtime context.

4.Flink integrated design

Hudi writes are batch in nature, while DeltaStreamer’s continuous mode is implemented by looping batch operations. In order to use the unified API, Hudi chose to save a batch of data for Flink integration and then processed it, and finally submitted it uniformly (in Flink, we used List to save a batch of data).

The easiest way to think of batch operation is to realize it by using time window. However, if there is no data inflow in a certain window, there will be no output data, and it is difficult for the Sink end to judge whether the same batch of data has been processed. Therefore, we use Flink’s checkpoint mechanism to save batches, with each batch of data between two barriers. When there is no data in a subtask, the mock results will make up the numbers. In this way, at the Sink end, when the result data is delivered to each subtask, it can be considered that a batch of data has been processed and you can perform commit.

DAG is as follows:

  • Source receives Kafka data and converts it to a List.
  • InstantGeneratorOperator generates globally unique Instant. If the previous Instant is incomplete or the current batch has no data, no new Instant is created.
  • KeyBy partitionPath Partitions according to partitionPath to avoid writing multiple subtasks in the same partition.
  • WriteProcessOperator Performs a write operation and sends empty result data to downstream if there is no data in the current partition.
  • The CommitSink receives the calculation results of upstream tasks. When the parallelism results are received, the CommitSink considers that all upstream tasks are complete, and executes commit.

Note: InstantGeneratorOperator and WriteProcessOperator are both custom Flink operators. InstantGeneratorOperator blocks internally to check the status of the previous Instant, To ensure only one global inflight (or requested) state of instant. WriteProcessOperator is the actual implementation of the operation, the write operation triggered when checkpoint.

5. Implementation examples

1) HoodieTable

/** * Abstract implementation of a HoodieTable. * * @param <T> Sub type of HoodieRecordPayload * @param <I> Type of inputs * @param <K> Type of keys * @param <O> Type of outputs */ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable { protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; protected final HoodieIndex<T, I, K, O> index; public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime, I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner); . }Copy the code

HoodieTable is one of Hudi’s core abstractions, which defines the insert,upsert,bulkInsert, and other operations supported by tables. In the case of upsert, the input data is changed from JavaRDD inputRdds to I Records, and the runtime JavaSparkContext JSC is changed to HoodieEngineContext Context.

You can see from the class annotation that T,I,K, and O represent the load data type, input data type, primary key type, and output data type for Hudi operations, respectively. These generics will run through the abstraction layer.

2) HoodieEngineContext

/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */ public abstract class HoodieEngineContext { public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism); public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism); . }Copy the code

HoodieEngineContext plays the role of JavaSparkContext. It not only provides all information provided by JavaSparkContext, but also encapsulates many methods such as Map, flatMap, and foreach. Hidden implementation of JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach(), etc.

Take the map method as an example. In The Spark implementation class HoodieSparkEngineContext, the map method is as follows:

  @Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
  }
Copy the code

Parallel (); parallel(); parallel();

  @Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
    return data.stream().parallel().map(func::apply).collect(Collectors.toList());
  }
Copy the code

Note: Exceptions thrown in the map function can be resolved by wrapping SerializableFunction<I, O> func. SerializableFunction: SerializableFunction

@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {
  O apply(I v1) throws Exception;
}
Copy the code

. The method is, in fact, the Java util. The function. The function of variant, and Java. Util. The function. The function is different SerializableFunction can be serialized, can throw exceptions. This function is introduced because JavaSparkContext#map() must receive input arguments that can be sequenced, and there are many exceptions to throw in hudi logic, and try catch code in Lambda expressions is a bit bloated and not very elegant.

6. Status and follow-up plans

6.1 Work timeline

In April 2020, T3 travel (Yang hua @vinoyang, wangxianghu @wangxianghu), alibaba’s classmates (li shaofeng @leesf) and a number of other partners designed and finalized the decoupling scheme;

In April 2020, T3 Travel (WangXianghu @Wangxianghu) completed the coding implementation internally, and conducted preliminary verification, and came to the conclusion that the scheme was feasible;

In July 2020, T3 Travel (Wangxianghu @Wangxianghu) introduced this design implementation and a version of Spark based on the new abstract implementation to the community (Hudi-1089);

On September 26, 2020, SF Express made public PR on Apache Flink Meetup (Shenzhen station) based on the modified version of T3 internal branch, making it the first enterprise in the industry to use Flink to write Hudi data online.

On 2 October 2020, HUDi-1089 was merged into the HUDI main branch, marking the completion of the decoupling of Hudi-Spark.

6.2 Follow-up Plan

1) Promote Hudi and Flink integration

Bringing Flink and Hudi integration to the community as soon as possible, initially this feature may only support Kafka data sources.

2) Performance optimization

In order to ensure the stability and performance of the Hud-Spark version, this decoupling does not give much consideration to the potential performance problems of the Flink version.

3) Flink-connector-HUDi third-party package development

The hudi-Flink binding is made into a third-party package that allows users to encode data from any source in the Flink application and write Hudi from this third-party package.