If Flink can handle the application scenarios of batch computing well, it can reduce the cost of development and maintenance when using Flink, and can enrich the Flink ecosystem. SQL is a common tool in batch computing, so Flink also uses SQL as the main interface for batch computing. Flink batch processing design and Hive integration. It is mainly divided into the following three points:

  1. Design architecture
  2. Project progress
  3. The performance test

Design architecture

First, I would like to share with you the Flink batch design architecture.

1. The background

The main reason for Flink to improve batch processing is to reduce customer maintenance costs and update costs and better improve the Flink ecosystem. SQL is a very important tool in batch computing scenarios, so WE want to use SQL as the main interface in batch computing scenarios, so we focus on optimizing the function of Flink SQL. At present, Flink SQL mainly has the following points to optimize:

  • A complete metadata management regime is required.
  • Lack of support for DDL(Data Definition Language DDL for creating various objects in a database such as tables, views, indexes, synonyms, clusters, and so on).
  • Hive is the earliest SQL engine in the field of big data. Hive has a wide user base. Some new SQL tools, such as Spark SQL and Impala, provide the interconnection function with Hive. This allows users to better migrate their applications from Hive, so connecting to Hive is also important for Flink SQL.

2. The target

So we need to accomplish the following:

  • Defining a unified Catalog interface is a prerequisite for making Flink SQL easier to interconnect with the outside world. If you have used TableSource and TableSink of Flink to connect to tables in external systems, you will find that whether you write a program or configure a YAML file, you will use it in a somewhat different way than traditional SQL. So we definitely do not want Hive users to migrate Flink SQL needs to define TableSouces and TableSink to interact with Hive. So we provided a new Catalog interface to interact with Hive in a way that more closely resembles traditional SQL.
  • Provides memory – and persistent-based implementations. Memory-based is Flink’s original approach, where the lifetime of all metadata is tied to the user’s Session, and all metadata is lost after the Session ends. Because you interact with Hive, you must provide a persistent Catalog.
  • Hive interoperability is supported. With the Catalog, users can access Hive metadata through the Catalog. Data Connector is provided for users to read and write Hive Data through Flink to realize Flink and Hive interaction.
  • Support Flink as Hive computing engine (long term goal), like Hive On Spark, Hive On Tez.

3. Newly designed Catalog API (FLIP-30)

Flink creates a TableEnvironment that loads and configures the CatalogManager instance. Flink creates a TableEnvironment that loads and configures the CatalogManager instance. In addition, Catalog supports multiple metadata types, such as Table, Database, function, view, partition, etc. In version 1.9.0, Catalog will have two implementations:

  • One is the memory-based GenericinMemoryCatalog.
  • HiveCatalog uses HiveShim to interact with Hive Metasotre to manipulate Hive metadata. HiveShim resolves the incompatibility of Hive Metastore in Hive versions.

From this implementation, users can create multiple Catalogs and access multiple Hive Metastores to query across catalogs.

4. Read and write Hive data

With metadata we can implement Flink SQL Data Connector to read and write actual Hive Data. Data written by Flink SQL must be compatible with Hive data format, that is, Hive can read data written by Flink and vice versa. To achieve this, we reuse a lot of Hive Input/Output Format, SerDe and other apis, one is to reduce code redundancy, two is as far as possible to maintain compatibility.

The implementation classes for reading Hive table Data in Data Connect are HiveTableSource and HiveTableInputFormat. The implementation classes for writing Hive tables are HiveTableSink and HiveTableOutputFormat.

Project progress

Next, I will share with you the current status of Flink 1.9.0, new features in 1.10.0 and future work.

1. Current status of Flink 1.9.0

Flink SQL was released as a trial feature in version 1.9.0, and its features are not yet complete:

  • The supported data types are incomplete. (Data types with parameters in 1.9.0 are basically not supported: DECIMAL,CHAR, etc.)
  • Support for partitioned tables is incomplete; partitioned tables can only be read, not written.
  • INSERT OVERWRITE of tables is not supported.

2. New features in Flink 1.10.0

Flink SQL has been further developed in version 1.10.0 and is more fully integrated with Hive.

  • Supports reading and writing static and dynamic partitioned tables.
  • INSERT OVERWRITE is supported at both the table level and partition level.
  • More data types are supported. (All types except UNION are supported)
  • Support for more DDL. (the CREATE TABLE/DATABASE)
  • Supports calling Hive built-in functions in Flink. (About 200 built-in Hive functions)
  • More Hive versions are supported. (Hive 1.0.0 to 3.1.1)
  • There are a lot of performance optimizations like Project/Predicate Pushdown, vector reading ORC data, etc.

3. The Module interface

To enable users to call the built-in functions in Flink SQL that call Hive, we introduced a Module interface in Flink 1.10. This Module is designed to enable users to easily access the built-in functions of the external system into the system.

  • Similar to Catalog, you can configure the Module using the Table API or the Yaml file.
  • Modules can load multiple functions at the same time. When Flink parses functions, it looks for functions in multiple modules according to the loading order of modules. If two modules contain a Function with the same name, the Module loaded first provides the Function definition.
  • There are currently two implementations of Module. CoreModule provides Flink native built-in functions, and HiveModule provides Hive built-in functions.

4. Future jobs

The future work is mainly to complete functions first, including:

  • View support (possible in 1.11).
  • Continuously improve the ease of use of THE SQL CLI. The query results can be displayed in page turning mode and then in scroll mode. In addition, Hive supports the -e-f non-interactive mode.
  • Supports all common Hive DDLS, such AS CREATE TABLE AS.
  • Compatible with Hive syntax, so that the original Hive projects in Flink smooth migration.
  • Supports SQL CLI remote connection mode, similar to HiveServer2 remote connection mode.
  • Supports streaming data writing to Hive.

The performance test

Here are Flink’s test environments and results compared to HiveMR tests in a batch job.

1. Test environment

First, our test environment used a physical cluster of 21 nodes, one Master node and 20 Slave nodes. The hardware configuration of the nodes is 32 cores, 64 threads, 256 memory, the network is port aggregation, and each machine has 12 HDDS.

2. Test tools

The test tool uses Hortonworks’ Hive-TestBench, an open source tool on Github. We used the tool to generate a 10TB TPC-DS test dataset and then tested the dataset tPC-DS using Flink SQL and Hive respectively.

On the one hand, we compare the performance of Flink and Hive. On the other hand, we verify that Flink SQL can access Hive data well. Hive version 3.1.1 was used for testing, and Flink used the Master branch code.

3. Test results

Test results Flink SQL achieved about 7 times better performance than Hive On MapReduce. This is thanks to a number of optimizations made by Flink SQL, such as in scheduling and execution plans. In general, if you are using Hive On MapReduce, migrating to Flink SQL will provide a significant performance improvement.

Flink 1.10 vs. Hive 3.0

About the author:

Rui Li (Tianyi), Alibaba technical expert, member of Apache Hive PMC. Before joining Alibaba, he worked for Intel, IBM and other companies, mainly involved in Hive, HDFS, Spark and other open source projects.

Wang Gang (Qiao Ran), Alibaba Senior Development Engineer, Flink Contributor. After graduation from the computer department of Zhejiang University, I worked in Mogujie Data platform, engaged in the development of data exchange system. At present, I mainly focus on Flink and Hive ecological construction in Ali.