Flink uses HiveCatalog to process tables in Hive in batch or stream mode. This means that Flink can not only serve as a Hive batch processing engine, but also read and write Hive tables through streaming processing, thus laying a solid foundation for the application of real-time data warehouse and streaming batch integration practice. This article will take Flink1.12 as an example to introduce Flink integration of Hive another very important aspect — Hive dimension Table JOIN(Temporal Table JOIN) and Flink read and write Hive Table. The following is the full text, I hope this article is helpful to you.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package

Flink writes to the Hive table

Flink supports writing to Hive tables in Batch and Streaming mode. When Hive tables are written in batch mode, the data can be seen only when the write job ends. Batch writing supports append mode and Overwrite mode.

Write in batch mode

  • Writes data to a non-partitioned table
Flink SQL> use catalog myhive; - the use of the catalog
Flink SQL> INSERT INTO users SELECT 2.'tom';
Flink SQL> set execution.type=batch; -- Use batch mode
Flink SQL> INSERT OVERWRITE users SELECT 2.'tom';
Copy the code
  • Writes data to a partitioned table
Write data to a static partitioned table
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom'.25;
Write data to a dynamically partitioned table
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom'.25.'type_1'.'2019-08-08';

Copy the code

Stream processing mode writes

**Insert overwrite ** is not supported for streaming writing to Hive tables. Otherwise, the following error is reported:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
Copy the code

The following example writes Kafka’s data stream to a Hive partition table

Use stream processing mode
Flink SQL> set execution.type=streaming;
-- Use the Hive dialect
Flink SQL> SET table.sql-dialect=hive; 
Create a Hive partition table
CREATE TABLE user_behavior_hive_tbl (
   `user_id` BIGINT.- user id
    `item_id` BIGINT.- the goods id
    `cat_id` BIGINT.- category id
    `action` STRING, -- User behavior
    `province` INT.- Province of the user
    `ts` BIGINT -- The timestamp of the user action
) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00'.'sink.partition-commit.trigger'='partition-time'.'sink.partition-commit.delay'='0S'.'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- Use the default SQL dialect
Flink SQL> SET table.sql-dialect=default; 
Create a kafka data source table
CREATE TABLE user_behavior ( 
    `user_id` BIGINT.- user id
    `item_id` BIGINT.- the goods id
    `cat_id` BIGINT.- category id
    `action` STRING, -- User behavior
    `province` INT.- Province of the user
    `ts` BIGINT.-- The timestamp of the user action
    `proctime` AS PROCTIME(), Generate a processing time column by calculating the column
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- Event time
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  - define the watermark
 ) WITH ( 
    'connector' = 'kafka'.Use the Kafka connector
    'topic' = 'user_behaviors'.Theme -- kafka
    'scan.startup.mode' = 'earliest-offset'.- the offset
    'properties.group.id' = 'group1'.-- Consumer Group
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'format' = 'json'.-- The data source format is JSON
    'json.fail-on-missing-field' = 'true'.'json.ignore-parse-errors' = 'false'
);

Copy the code

Hive table attributes:

  • partition.time-extractor.timestamp-pattern

    • Default value :(none)
    • ** DT ∗∗ if the partition is performed by day. If the partition is performed by hour, the value of this attribute is: ‘dt**, if the partition is performed by hour, the value of this property is:’ dt∗∗; if the partition is performed by hour, the value of this property is: ‘year-month −month-month−day hour:00:00′, the value is’ hour:00:00 ‘if the partition is performed by day. The value is’ hour:00:00′ if the partition is performed by day. ‘hour:00:00′, if the partition is performed by day, the value of this attribute is’ dt $hour:00:00 ‘.
  • sink.partition-commit.trigger

    • Default value: process-time
    • Partition trigger type, optionalThe process – time or partition – time.
      • Process-time: no time extractor or water line is required. If the current time is greater than the time when the partition is created + sink.partition-commit.delay, the partition is committed.
      • Partition -time: the partition needs to be defined in the Source table. When watermark > partition time extracted +sink.partition-commit.delay, the partition is committed.
  • sink.partition-commit.delay

    • Default value: 0S
    • Explanation: Delay time of partition submission. If the partition is partitioned by day, the value of this attribute is 1d; if the partition is partitioned by hour, the value of this attribute is 1H.
  • sink.partition-commit.policy.kind

    • Default value :(none)

    • A policy for submitting a partition to inform downstream applications that the partition has been written, that is, data on the partition can be accessed and read. The possible values are as follows:

      • Metastore: adds metadata information about partitions. Only Hive tables support this value
      • success-file: Adds a table to its storage path_SUCCESSfile

      You can configure both values at the same time, such as metastore and success-file

Write to the Hive table by streaming

-- Streaming SQL: Write data to Hive table
INSERT INTO user_behavior_hive_tbl 
SELECT 
    user_id,
    item_id,
    cat_id,
    action,
    province,
    ts,
    FROM_UNIXTIME(ts, 'yyyy-MM-dd'),
    FROM_UNIXTIME(ts, 'HH'),
    FROM_UNIXTIME(ts, 'mm')
FROM user_behavior;

-- Batch SQL: queries partition data of Hive tables
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND  hr='16' AND mi = '46';
Copy the code

Query partition data in the Hive table:

Scream tips:

1.Flink uses batch mode by default to read Hive tables. If you want to read Hive tables using streaming mode, you must specify some parameters, as described in the following sections.

2. The file changes from in-progress to Finish only after the Checkpoint is completed, and the _SUCCESS file is generated. Therefore, you need to enable and Checkpoint the Flink flow to write to the Hive table. For the Flink SQL Client, CheckPoint is enabled in flink-conf.yaml.

state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

Flink reads the Hive table

Flink supports reading tables in Hive by Batch and Streaming. Batch processing is similar to Hive query. That is, Hive tables are queried only once when the query is submitted. Flow processing will continuously monitor Hive tables and incrementally extract new data. By default, Flink reads Hive tables in batch mode.

Flink supports both partitioned and non-partitioned tables for streaming reading of Hive tables. For partitioned tables, Flink will monitor the newly generated partitioned data and read it incrementally. For non-partitioned tables, Flink monitors new files in the Hive table storage path folder and incrementally reads new data.

Flink can set the following parameters:

  • streaming-source.enable

    • Default value: false
    • Description: Whether to enable streaming reading of Hive tables is disabled by default.
  • streaming-source.partition.include

    • Default value: all
    • Note: You can read Hive partitions in either of the following modes: All or latest. All indicates that data on all partitions is read. Latest indicates that data on only the latest partition is read. Note that the Latest mode can only be used when streaming read of Hive tables is enabled and dimension table JOIN is enabled.
  • streaming-source.monitor-interval

    • Default value: None
    • Description: Continuously monitor the interval between Hive table partitions or files. Note that when the Hive table is read in stream mode, the default value of this parameter is 1m, that is, 1 minute. When temporal Join, the default value is 60m, that is, 1 hour. In addition, the parameter setting should not be too short. The minimum value is 1 hour. In current implementation, each task queries MetaStore, and frequent query may cause excessive pressure on MetaStore.
  • streaming-source.partition-order

    • Default value: partition-name
    • The partition order of the Streaming source. The default partition name is partition-name, which indicates that the latest partitions are loaded in sequence using the default partition name. Two other modes are available: create-time and partition-time. Create-time indicates the time sequence for creating partition files. Partition-time Specifies the partition time sequence. Note that for non-partitioned tables, the default value of this parameter is create-time.
  • streaming-source.consume-start-offset

    • Default value: None
    • Read the start offset of the Hive table by streaming.
  • partition.time-extractor.kind
    • Default value: default
    • Partition time extractor type. Used to extract time from a partition. Default and custom are supported. If default is used, you need to pass the parameterpartition.time-extractor.timestamp-patternConfigure the regular expression for timestamp extraction.

SQL Hint needs to be explicitly enabled in the SQL Client

Flink SQL> set table.dynamic-table-options.enabled= true;  
Copy the code

Run the SQLHint stream to query the Hive table

SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;
Copy the code

Hive dimension table JOIN

Flink 1.12 supports the function of using the latest Hive partition as temporal table. It can directly associate the latest Hive partition table with the latest Hive partition table in SQL mode. Flink 1.12 automatically monitors the latest Hive partition, and when the new partition is monitored, it will automatically replace the full dimension table data.

Flink supports processing-time temporal Join, which means that the temporal table is always joined with the latest version. In addition, Flink supports both temporal join of non-partitioned tables and temporal Join of partitioned tables. For partitioned tables, Flink listens for the latest partitioned data of Hive tables. It is worth noting that Flink does not support event-time temporal Join yet.

Temporal Join Latest partition

For a Hive partition table that changes over time, Flink can read the table data as an unbounded stream. If each partition of a Hive partitioned table contains a full amount of data, each partition is used as the version data of a temporal table, that is, the latest partition data is used as the full dimension table data. It’s worth noting that this feature only supports Flink’s STREAMING mode.

Before using the latest Hive partition as a Tempmoral table, you need to set the necessary two parameters:

'streaming-source.enable' = 'true'.'streaming-source.partition.include' = 'latest'
Copy the code

There are some other parameters that are explained in the analysis above. When using Hive dimension tables, you can specify specific parameters when creating Hive tables, or you can use SQL Hint to specify parameters dynamically. A Hive dimension table creation template is as follows:

Use Hive SQL dialect
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10.4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  -- Method 1: Identify the latest partition by partition name (recommended)
  'streaming-source.enable' = 'true'.-- Open Streaming Source
  'streaming-source.partition.include' = 'latest'.Select the latest partition
  'streaming-source.monitor-interval' = '12 h'.The latest partition data is loaded every 12 hours
  'streaming-source.partition-order' = 'partition-name'.Sort by partition name

  Method 2: Sort the creation time of partition files to identify the latest partition
  'streaming-source.enable' = 'true'.'streaming-source.partition.include' = 'latest'.'streaming-source.partition-order' = 'create-time'.Partition file creation time sort
  'streaming-source.monitor-interval' = '12 h'

  Method 3: Identify the latest partitions by partition time
  'streaming-source.enable' = 'true'.'streaming-source.partition.include' = 'latest'.'streaming-source.monitor-interval' = '12 h'.'streaming-source.partition-order' = 'partition-time'.Sort by partition time
  'partition.time-extractor.kind' = 'default'.'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);

Copy the code

With the Hive dimension table above, we can use this dimension table to JOIN Kafka’s real-time stream data to get the corresponding wide table data.

-- Use the default SQL dialect
SET table.sql-dialect=default;
Kafka real-time streaming data table
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH(...). ;Associate the flow table with the latest hive partition data
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim 
ON orders.product_id = dim.product_id;
Copy the code

In addition to specifying parameters when defining Hive dimension tables, you can use SQL Hint to dynamically specify parameters as follows:

SELECT *
FROM orders_table AS orders
JOIN dimension_table
/*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF orders.proctime AS dim -- Temporal table (Dimension table)
ON orders.product_id = dim.product_id;
Copy the code

Temporal Join Latest table

For Hive non-partitioned tables, when temporal Join is used, the whole Hive table is cached in Slot memory and then matched with data in the flow based on the corresponding key. Using the latest Hive table to perform temporal Join does not require additional configuration. You only need to configure a TTL for the Hive table cache. The TTL is used to rescan the Hive table and load the latest data when the cache expires.

  • lookup.join.cache.ttl

    • Default value: 60 minutes
    • Description: Indicates the cache time. Hive dimension tables cache all dimension table data in TM memory. If the volume of dimension table data is large, OOM is generated. Of course, the TTL time should not be too short, because the data will be loaded frequently, which affects performance.

    Scream tips:

    When using this method, the Hive table must be a bounded lookup table, i.e. a tense table that is not a Streaming Source. In other words, the attribute of the table streaming-source-enable = false.

    If you want to use the Streaming Source temporal table, remember to set the streaming-source-monitor-interval value, which is the time interval for data updates.

Hive dimension table data is loaded in batch mode by day
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10.4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) TBLPROPERTIES (
  'streaming-source.enable' = 'false'.-- Close streaming Source
  'streaming-source.partition.include' = 'all'.-- Read all data
  'lookup.join.cache.ttl' = '12 h'
);
-- Kafka fact table
SET table.sql-dialect=default;
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH(...). ;-- Hive dimension table join, Flink will load all data of the dimension table to the memory
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;
Copy the code

Scream tips:

1. Each sub-task needs to cache the full data of a dimension table. Ensure that the TM Task Slot is large enough to accommodate the data volume of the dimension table.

2. You are advised to set the values of streaming-source.monitor-interval and lookup.join.cache. TTL to a large value because frequent updates and loading of data may affect performance.

3. When cached dimension table data needs to be refreshed, the current practice is to load the entire table, so new data cannot be distinguished from old data.

Hive dimension table JOIN example

If the dimension table data is loaded to Hive in batch mode (for example, daily) and the fact flow data in Kafka needs to be joined to build a wide table data, Hive dimension table JOIN can be used.

  • Create a kafka data source table, live stream
SET table.sql-dialect=default;
CREATE TABLE fact_user_behavior ( 
    `user_id` BIGINT.- user id
    `item_id` BIGINT.- the goods id
    `action` STRING, -- User behavior
    `province` INT.- Province of the user
    `ts` BIGINT.-- The timestamp of the user action
    `proctime` AS PROCTIME(), Generate a processing time column by calculating the column
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- Event time
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  - define the watermark
 ) WITH ( 
    'connector' = 'kafka'.Use the Kafka connector
    'topic' = 'user_behaviors'.Theme -- kafka
    'scan.startup.mode' = 'earliest-offset'.- the offset
    'properties.group.id' = 'group1'.-- Consumer Group
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092'.'format' = 'json'.-- The data source format is JSON
    'json.fail-on-missing-field' = 'true'.'json.ignore-parse-errors' = 'false'
);
Copy the code
  • Create a Hive dimension table
SET table.sql-dialect=hive;
CREATE TABLE dim_item (
  item_id BIGINT,
  item_name STRING,
  unit_price DECIMAL(10.4)
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'true'.'streaming-source.partition.include' = 'latest'.'streaming-source.monitor-interval' = '12 h'.'streaming-source.partition-order' = 'partition-name'
);
Copy the code
  • Associate the latest data of the Hive dimension table
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;
Copy the code

Use SQL Hint to associate a non-partitioned Hive dimension table:

set table.dynamic-table-options.enabled= true; 
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item1
/*+ OPTIONS('streaming-source.enable'='false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '12 h') */
FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;
Copy the code

conclusion

This article uses the latest version of Flink1.12 as an example to describe the different ways Flink can read and write Hive, and provides examples for each way. In practical applications, real-time data streams and Hive dimension tables are often joined to construct wide tables. Flink provides Hive dimension table joins to simplify user usage. In the end of this article, Flink will explain the basic steps of Hive dimension table JOIN and use examples, hoping to help you.

The public account “Big Data Technology and Data Warehouse”, reply to “information” to receive the big data data package