This document describes the combination of HBase and Flink SQL. As the open source implementation version of Google’s Big Table paper, HBase is a distributed column storage database. It is a NoSQL database built on HDFS, which is suitable for large-scale real-time query. Therefore, HBase is widely used in real-time computing. You can write HBase in real time or Load offline jobs to hFiles in HBase tables using buckload. The popularity of Flink SQL is needless to say. Flink SQL also provides connector for HBase. Therefore, it is necessary to practice the combination of HBase and Flink SQL.

Of course, this article assumes that users have some basic HBase knowledge and does not introduce the HBase architecture and principles in detail. This article focuses on the combination of HBase and Flink in actual scenarios. There are mainly two scenarios. The first scenario is that HBase performs temporal table join with Flink Kafka table as a dimension table. Second scenario: Flink SQL is used to write the calculation results to the HBase table for other users to query. Therefore, the content of this article is as follows:

· HBase environment · Data preparation · Scenario where HBase performs temporal table join as a dimension table · Flink SQL performs calculation and HBase writing · Summary

1. HBase environment preparation

There is no HBase environment for testing and to avoid contamination of the online HBase environment. Therefore, build an Hbase Docker image(docker pull guxinglei/myhbase). Hbase 2.2.0 and JDK1.8 are installed on the official clean Ubuntu IMgae.

Start the container to expose the Hbase Web UI port and built-in ZK port. This helps you view information on the Web page and create a Flink Hbase table that requires ZK link information.

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash
Copy the code

· Enter the container, start the HBase cluster, and start the REST Server, so that we can use rest API to read the data written into the HBase by Flink SQL.

Sh # Start restServerbin/hbase-daemon.sh start rest -p 8000Copy the code

2. Data preparation

Because the HBase environment is a standalone service created temporarily by the user, data is not stored in the HBase environment. Therefore, you need to write data in the HBase environment for subsequent examples. In the second part of Flink SQL Practice series, how to register Flink Mysql table is introduced. We can extract advertising space table into HBase table for dimension table and temporal table join. Therefore, you need to create a table in HBase and a Flink HBase table. The two tables are associated with the HBase connector of Flink SQL.

· Start HBase shell in the container and create an HBase table named DIM_hbase as follows:

Create table dim_hbase (main):002:0> create 'DIM_hbase ',' CF' Created table DIM_hbase Took 1.3120 seconds => Hbase::Table - dim_hbaseCopy the code

· Create Flink HBase table in Flink, as follows:

# register Flink Hbase table DROP table IF EXISTS flink_rtdw.demo. hbase_DIM_table; CREATE TABLE flink_rtdw.demo.hbase_dim_table ( rowkey STRING, cf ROW < adspace_name STRING >, PRIMARY KEY (Rowkey) NOT 残 疾) WITH ('connector' = 'hbase-1.4', 'table-name' = 'DIM_hbase ', 'sink.buffer-flush.max-rows' = '1000', 'zookeeper.quorum' = 'localhost:2181' );Copy the code

· Flink MySQL table and Flink HBase table have been created, so you can write and extract data to the SQL job of HBase. The SQL statement and job status are as follows:

Insert into hbase_DIM_table select CAST (ID as VARCHAR), ROW(name) from mysql_DIM_table;Copy the code

03 Scenario where HBase functions as a dimension table and Performs temporal Join with Kafka

In Flink SQL Join, the join of dimension table must be inseparable, such as order amount join exchange rate table, click flow join advertisement spot list and so on, which are widely used. HBase as a distributed database has more advantages than MySQL as a dimension table and as a dimension table join. In the second chapter of Flink SQL practice series, we registered the click flow of advertising, registered Flink Kafka Table for Kafka topic, and also introduced the use of temporal Table join in Flink SQL; In this section, HBase will be used as a dimension table. Data has been extracted to HBase in the above section, so we can directly write temporal table join calculation logic.

· Flink Kafa Table as AD click flow and Flink HBase table as AD position make temporal table join through AD position Id and output AD position Id and Chinese name. The SQL join logic is as follows:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       hbase_dim_table.cf.adspace_name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
left join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;
Copy the code

· The status on the Flink cluster submitted by temporal Table Join job and the join result are as follows:

4. Scenario where the calculation result is sink to HBase as the result

In the above section, it is very common for HBase to be used as dimension table and as temporal table join. In fact, it is also very common for HBase to store calculation results. After all, as a distributed database, the underlying storage is HDFS with multi-copy mechanism, which is easy to maintain. Capacity expansion is convenient, real-time query is fast, and clients can use data stored in HBase downstream. This section describes how Flink SQL writes the calculation results to HBase and queries the calculation results using REST apis.

· Enter the container, create an HBase table in HBase, and a column family meets the requirements. The table construction sentences are as follows:

Hbase sink table create 'dwa_hbase_click_report','cf'Copy the code

· After the HBase table is established, we need to create a Flink HBase table in Flink SQL. At this time, we need to specify the column column below cf column famaly. In Flink SQL Practice Ii, The Flink Kafka table has been registered as a clickstream, so in this section, uv and click_count will be calculated for the clickstream, so the two columns are uv and click_count respectively, and the table construction sentence is as follows:

Flink Hbase table DROP table IF EXISTS flink_rtdw.demo.dwa_hbase_click_report; CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report ( rowkey STRING, cf ROW < uv BIGINT, click_count BIGINT >, PRIMARY KEY (Rowkey) NOT 残 疾) WITH ('connector' = 'hbase-1.4', 'table-name' = 'dwa_hbase_click_report', 'sink.buffer-flush.max-rows' = '1000', 'zookeeper.quorum' = 'hostname:2181' );Copy the code

· Flink Kafka table for clickstream and HBase table and Flink HBase table for storing calculation results have been prepared. We will do a one-minute flip window to calculate UV and click number, and write the calculation results into HBase. Those of you who know HBase know that the design of Rowkey has a significant impact on the distribution of HBase Regoin, Based on this, our Rowkey uses the built-in reverse function of Flink SQL to carry out AD position Id inversion and concat window start time. Therefore, the SQL logic statement is as follows:

INSERT INTO dwa_hbase_click_report
SELECT
CONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,
'_',
CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)
  ) as rowkey, 
ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cf
FROM
  adsdw_dwd_max_click_mobileapp
WHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULL
GROUP BY
  TUMBLE(ets, INTERVAL '1' MINUTE),
  publisher_adspace_adspaceId;
Copy the code

· The status and result check after the SQL job is submitted is as follows:

The previous SQL job has written the settlement result to HBase. For online HBase services, many colleagues may not have the permission of the HBase client, so they cannot read data through HBase Shell. In addition, as an online report service, it is obviously impossible to query data through HBase Shell. In real-time report scenarios, data development engineers write data to HBase, and front-end engineers read data using REST apis. We have started the HBase REST Server process. We can read HBase data through the REST service.

· Let’s first get a piece of data just written to HBase, as follows:

· You can query HBase data through THE REST API. First, run the following statement to obtain the scannerId: The rowkey to be queried needs to be base64 encoded before it can be used, and then the result needs to be base64 decoded

Rowkey base64 encoding before: 0122612 _1606295280000 base64 encoding: after MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \
         -H "Accept: text/xml" \
         -H "Content-Type: text/xml" \
         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \
"http://hostname:8000/dwa_hbase_click_report/scanner"
Copy the code

· Step 2 run the following statement to query the scannerID data returned by the previous statement. You can see the scannerID result:

curl -vi -X GET \
         -H "Accept: application/json" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"
Copy the code

· Step 3 Run the following command to delete this scannerId:

curl -vi -X DELETE \
         -H "Accept: text/xml" \
"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"
Copy the code

Five, the summary

In this article, we introduce two scenarios where HBase and Flink SQL are used together: as dimension tables and to store calculation results; In addition, REST apis are used to query HBase data, avoiding direct exposure of the HBase ZK and decoupling the REST Server from the HBase cluster.

Author’s brief introduction

Yu Ao, senior engineer of 360 Data Development, currently focuses on the construction and platformization of real-time data warehouse based on Flink. Rich experience in ETL and warehouse development for Flink, Kafka, Hive, Spark, etc.