preface

Hive has become a core component of the data warehouse ecosystem. It is not only an SQL engine for big data analysis and ETL, but also a data management platform that can be used to discover, define and evolve data. Flink is currently the most popular streaming computing engine, which can calculate state on both unbounded and bounded data streams. Flink has supported Hive integration since version 1.9, although version 1.9 is in Beta and is not recommended for production use. In version 1.10, Flink’s Hive integration reached production level requirements with the completion of the ali Blink integration. Note that different versions of Flink integrate Hive differently. This article will take Flink version 1.12 and Hive version 3.1.2 as an example to briefly explain the steps of Flink to integrate Hive.

Integrated way

Flink’s integration with Hive consists of the following two layers:

  • First, Flink uses Hive’s Metastore as its persistent Catalog. We can store Flink metadata in different sessions in Hive Metastore via HiveCatalog. For example, we can use HiveCatalog to store metadata information for Kafka or Elasticsearch tables in Hive Metastore, which can be reused in subsequent SQL queries.

  • Second, using Flink to read and write Hive tables is like using SparkSQL or Impala to query Hive data.

HiveCatalog is designed to provide compatibility with Hive. Users can access their existing Hive bins “out of the box”. There is no need to modify the existing Hive Metastore or change the data location or partition of the table.

Supported Hive versions

Flink supports the following Hive versions.

The big version V1 V2 V3 V4 V5 V6 V7
1.0 1.0.0 1.0.1
1.1 1.1.0 1.1.1
1.2 1.2.0 1.2.1 1.2.2
2.0 2.0.0 2.0.1
2.1 2.1.0 2.1.1
2.2 2.2.0
2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
3.1 3.1.0 3.1.1 3.1.2

Please note that certain features are available depending on which version of Hive you are using, and these limitations are not caused by Flink:

  • Hive built-in functions are supported when using Hive-1.2.0 or later.
  • Column constraints, i.e. PRIMARY KEY and NOT NULL, are supported with Hive-3.1.0 and later.
  • Change table statistics, supported when using Hive-1.2.0 and later.
  • DATEColumn statistics, supported when using Hive-1.2.0 and later.
  • Hive-2.0.x does not support writing to ORC tables.

dependency

Flink integration with Hive requires adding some dependent JAR packages and placing them in the Lib folder in the Flink installation directory so that they can interact with Hive through the Table API or SQL Client.

The Flink website provides two ways to add Hive dependencies. The first is to use the Hive JAR provided by Flink. You can select the corresponding Hive JAR based on the Metastore version you are using. The second method is to add each required JAR separately, for example, if the Hive version you are using is inconsistent with the Hive JAR compatible version provided by Flink.

Note: It is recommended to use the Hive JAR provided by Flink.

Use the Hive JAR provided by Flink

The following lists all available Hive jars. You can download the corresponding JARS based on the Hive version you are using. For example, the Hive version used in this article is Hive 3.1.2, so simply download flink-sqL-connector-hive-3.1.2. jar and place it in the lib folder of the Flink installation directory.

Metastore version Maven dependency SQL Client JAR
1.0.0 ~ 1.2.2 Flink - SQL - connector - hive - 1.2.2 download
2.0.0 ~ 2.2.0 Flink - SQL - connector - hive - 2.2.0 download
2.3.0 ~ 2.3.6 Flink - SQL - connector - hive - 2.3.6 download
3.0.0 ~ 3.1.2 Flink - SQL - connector - hive - 3.1.2 download

User-defined dependencies

Jar flink-sqL-connector-hive-3.1.2. jar flink SQL Cli jar flink SQL Cli jar

Dependent jar Download address
Flink connector – hive_2. 12-1.12.0. Jar download
Flink shaded – hadoop – 3 – uber – 3.1.1.7.1.1.0-565-9.0 – jar download
Hive – exec – 3.1.2. Jar In the Lib directory of the Hive installation directory
Libfb303-0.9.3. Jar In the Lib directory of the Hive installation directory

Connect the Hive

Flink SQL Cli is used to connect to Hive.

Configure SQL – the client – defaults. Yaml

The sqL-client-defaults. yaml file is a configuration file used for Flink SQL Cli startup. It is located in the conf folder of the Flink installation directory. The specific configuration is as follows.

catalogs:
    - name: myhive
      type: hive
      default-database: default
      hive-conf-dir: /opt/hive/conf/
      hadoop-conf-dir: /opt/hadoop/etc/hadoop/
Copy the code

The following table lists the parameters supported when defining HiveCatalog through YAML files or DDL:

parameter Will choose The default value type describe
type is (not) String The type of Catalog. When creating HiveCatalog, this parameter must be set to'hive'.
name is (not) String Name of Catalog. This is required only when YAML file is used.
hive-conf-dir no (not) String Points to the URI containing the hive-site. XML directory. The URI must be of the type supported by the Hadoop file system. If you specify a relative URI that does not include Scheme, the default is the local file system. If this parameter is not specified, we look for hive-site.xml under class path.
default-database no default String The default current database used when a catalog is set to the current catalog.
hive-version no (not) String HiveCatalog automatically detects Hive versions in use. We suggest that theDon’tManually set the Hive version unless the automatic detection mechanism fails.
hadoop-conf-dir no (not) String Path to Hadoop configuration file directory. Currently, only local file system paths are supported. We recommend usingHADOOP_CONF_DIREnvironment variables to specify the Hadoop configuration. So consider using this parameter only when the environment variables do not meet your needs, such as when you want to set the Hadoop configuration individually for each HiveCatalog.

Operate tables in Hive

Start the FlinkSQL Cli as follows:

sql-client.sh embedded
Copy the code

Failed to start, the following error is reported:

Flink does not support Embedded MetaStore when integrating Hive. When configuring Hive, start the Hive metastore service and set the correct value for hive.metastore.uris in the conf/hive-site. XML configuration file.

Start the Hive MetaStore service

hive --service metastore
Copy the code

Configure hive.metastore.uris in hive-site. XML

<property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>
Copy the code

Starting FlinkSQL Cli successfully again, we can now view the registered catalog

Flink SQL> show catalogs;
default_catalog
myhive
Copy the code

Use the registered MyHive Catalog

Flink SQL> use catalog myhive;
Copy the code

Query the database. Proghive is the Hive library that I created during the Hive Programming Guide

Flink SQL> show databases;
default
proghive
Copy the code

View all tables

Flink SQL> use proghive;
Flink SQL> show tables;
dividends
employees
stocks
Copy the code

Select employees from Hive;

hive> select * from employees;
OK
John Doe	100000.0	["Mary Smith","Todd Jones"]	{"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}	{"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith	80000.0	["Bill King"]	{"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}	{"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones	70000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King	60000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
Boss Man	200000.0	["John Doe","Fred Finance"]	{"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05}	{"street":"1 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Fred Finance	150000.0	["Stacy Accountant"]	{"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05}	{"street":"2 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Stacy Accountant	60000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"300 Main St.","city":"Naperville","state":"IL","zip":60563}
Time taken: 0.21 seconds, Fetched: 7 row(s)
Copy the code

Now let’s use Flink SQL to query tables in Hive.

Flink SQL> select * from employees;
Copy the code

Create a Kafka data source table in FlinkSQL Cli:

CREATE TABLE user_behavior ( 
    `user_id` BIGINT.- user id
    `item_id` BIGINT.- the goods id
    `cat_id` BIGINT.- category id
    `action` STRING, -- User behavior
    `province` INT.- Province of the user
    `ts` BIGINT.-- The timestamp of the user action
    `proctime` AS PROCTIME(), Generate a processing time column by calculating the column
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- Event time
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  - define the watermark
 ) WITH ( 
    'connector' = 'kafka'.Use the Kafka connector
    'topic' = 'user_behavior'.Theme -- kafka
    'scan.startup.mode' = 'earliest-offset'.- the offset
    'properties.group.id' = 'group1'.-- Consumer Group
    'properties.bootstrap.servers' = 'localhost:9092'.'format' = 'json'.-- The data source format is JSON
    'json.fail-on-missing-field' = 'true'.'json.ignore-parse-errors' = 'false'
);
Copy the code

View table structure

Flink SQL> DESCRIBE user_behavior;
Copy the code

You can run the following command in the Hive client to view the table you just created in the Flink SQL Cli

hive> desc formatted  user_behavior;
OK
# col_name            	data_type           	comment

# Detailed Table Information
Database:           	proghive
OwnerType:          	USER
Owner:              	null
CreateTime:         	Thu Dec 24 15:52:18 CST 2020
LastAccessTime:     	UNKNOWN
Retention:          	0
Location:           	hdfs://localhost:9000/user/hive/warehouse/proghive.db/user_behavior
Table Type:         	MANAGED_TABLE
Table Parameters:
	flink.connector     	kafka
	flink.format        	json
	flink.json.fail-on-missing-field	true
	flink.json.ignore-parse-errors	false
	flink.properties.bootstrap.servers	localhost:9092
	flink.properties.group.id	group1
	flink.scan.startup.mode	earliest-offset
	flink.schema. 0.data-type	BIGINT
	flink.schema. 0.name 	user_id
	flink.schema1..data-type	BIGINT
	flink.schema1..name 	item_id
	flink.schema2..data-type	BIGINT
	flink.schema2..name 	cat_id
	flink.schema3..data-type	VARCHAR(2147483647)
	flink.schema3..name 	action
	flink.schema4..data-type	INT
	flink.schema4..name 	province
	flink.schema. 5.data-type	BIGINT
	flink.schema. 5.name 	ts
	flink.schema6..data-type	TIMESTAMP(3) NOT NULL
	flink.schema6..expr 	PROCTIME()
	flink.schema6..name 	proctime
	flink.schema7..data-type	TIMESTAMP(3)
	flink.schema7..expr 	TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
	flink.schema7..name 	eventTime
	flink.schema.watermark. 0.rowtime	eventTime
	flink.schema.watermark. 0.strategy.data-type	TIMESTAMP(3)
	flink.schema.watermark. 0.strategy.expr	`eventTime` - INTERVAL '5' SECOND
	flink.topic         	user_behavior
	is_generic          	true
	transient_lastDdlTime	1608796338

# Storage Information
SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:        	org.apache.hadoop.mapred.TextInputFormat
OutputFormat:       	org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:         	No
Num Buckets:        	- 1
Bucket Columns:     	[]
Sort Columns:       	[]
Storage Desc Params:
	serialization.format	1
Time taken: 0.212 seconds, Fetched: 54 row(s)                
Copy the code

Flink SQL The metadata of the USER_Behavior table created by the Cli is persisted in the Hive metadata information library. MySQL is used in this document. Execute the following command:

SELECT 
    a.tbl_id, - table id
    from_unixtime(create_time) AS create_time, -- Create time
    a.db_id, -- Database id
    b.name AS db_name, -- Database name
    a.tbl_name - the name of the table
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";
Copy the code