In SQL tasks often meet the requirements of a column to multiple rows, the following to summarize in Flink SQL how to achieve column to row, first look at the following a specific case.

demand

The original data format is as follows:

name data
JasonLee [{“content_type”:”flink”,”url”:”111″},{“content_type”:”spark”,”url”:”222″},{“content_type”:”hadoop”,”url”:”333″}]

The data format

{
    "name""JasonLee"."data": [{
            "content_type""flink"."url""111"
        }, {
            "content_type""spark"."url""222"
        },
        {
            "content_type""hadoop"."url""333"}}]Copy the code

The desired data format now looks like this:

name content_type url
JasonLee flink 111
JasonLee spark 222
JasonLee hadoop 333

This is a typical column to row or row to row scenario, the need to split the data column into multiple rows and columns, the following two implementation methods.

  1. Use Flink’s built-in unnest function to parse
  2. ##### uses custom UDTF functions for parsing

Build a table DDL

CREATE TABLE kafka_table (
name string,
`data` ARRAY<ROW<content_type STRING,url STRING>>
)
WITH (
    'connector' = 'kafka'.Use the Kafka connector
    'topic' = 'test'.'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092'.-- Broker connects information
    'properties.group.id' = 'jason_flink_test'.Group_id of kafka
    'scan.startup.mode' = 'latest-offset'.-- The location where data is read
    'format' = 'json'.-- The data source format is JSON
    'json.fail-on-missing-field' = 'false'.-- The lost field task does not fail
    'json.ignore-parse-errors' = 'true'  -- Parsing failure skipped
)
Copy the code

Here we define the data field type as ARRAY directly, because the unnest function requires an ARRAY parameter.

Unnest parsing
select name,content_type,url
from kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)
Copy the code
select name,content_type,url
from kafka_table, UNNEST(`data`) AS t (content_type,url)
Copy the code
select name,content_type,url
from kafka_table left join UNNEST(`data`) AS t (content_type,url) on true
Copy the code

Custom UDTF resolution

Custom table valued functions (UDTF), custom table valued functions that take zero, one, or more scalar values as input arguments (which can be variable-length arguments). Similar to, but different from, custom scalar functions. Table-valued functions can return any number of rows as output, not just one value. The returned row can consist of one or more columns. Calling a function once outputs more than one row or column of data. You must inherit the TableFunction base class and implement one or more methods named eval. When using UDTF, you need two keywords: LATERAL TABLE.

@FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))
public class ParserJsonArrayTest extends TableFunction<Row{

    private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);

    public void eval(String value) {
        try {
            JSONArray snapshots = JSONArray.parseArray(value);
            Iterator<Object> iterator = snapshots.iterator();
            while (iterator.hasNext()) {
                JSONObject jsonObject = (JSONObject) iterator.next();
                String content_type = jsonObject.getString("content_type");
                String url = jsonObject.getString("url"); collect(Row.of(content_type,url)); }}catch (Exception e) {
            log.error("parser json failed :"+ e.getMessage()); }}}Copy the code

When customizing UDTF parsing, you do not need to define the data field as an ARRAY, but can directly define it as a STRING. In addition, this method is more flexible. For example, when you need to filter data or perform more complex operations, you can complete them in the UDTF.

Flink SQL uses UDTF

select name,content_type,url
from kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
Copy the code
select name,content_type,url
from kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
Copy the code
select name,content_type,url
from kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true
Copy the code

Note:

Both unnest and custom UDTF functions can be written in three different ways. The first two methods have the same effect. The third method is equivalent to left Join. The difference is that CROSS JOIN/INNER JOIN: for each row of the left table, the right UDTF does not output, then this row does not output. LEFT JOIN: For each row in the LEFT table, if the UDTF on the right does not output, the row will output, and the UDTF field on the right is null

Printed result

2> JasonLee,flink,111
2> JasonLee,spark,222
2> JasonLee,hadoop,333
Copy the code

conclusion

In the actual use, if Unnest can meet the requirements, unnest can be used directly without additional development. If Unnest function cannot meet the requirements, customized UDTF can be used to complete it.