Flink SQL background

 

Flink SQL is a standard SQL semantic development language designed by Flink real-time computing to simplify the calculation model and lower the threshold for users to use real-time computing.

Since 2015, Alibaba began to investigate the open source computing engine, and finally decided to build a new generation of computing engine based on Flink, optimize and improve the shortcomings of Flink, and open source the final code in early 2019, which is also known as Blink. One of Blink’s most notable contributions to the original Flink is the implementation of Flink SQL.

Flink SQL is a user-oriented API layer. In our traditional Streaming computing domain, such as Storm and Spark Streaming, there are some functions or Datastream apis that users can write business logic in Java or Scala. While this approach is flexible, it has some disadvantages, such as a certain threshold and difficulty in tuning, as well as many incompatibilities in the API as versions are updated.

 

In this context, THERE is no doubt that SQL is the best choice for us. We chose SQL as our core API because it has several very important characteristics:

  • SQL is a set language, users only need to express clear needs, do not need to understand specific practices;
  • SQL can be optimized with multiple query optimizers built in that translate the optimal execution plan for SQL;
  • SQL is easy to understand, understood by people in different industries and fields, and low cost to learn;
  • SQL is very stable, and SQL itself has changed little in the database’s 30-plus year history;
  • Flow and batch unification, Flink bottom Runtime itself is a flow and batch unification engine, and SQL can achieve API layer flow and batch unification.

Flink’s latest features (updates 1.7.0 and 1.8.0)

2.1 New Flink 1.7.0 features

In Flink 1.7.0, we are closer to achieving fast data processing and building data-intensive applications for the Flink community in a seamless manner. The latest release includes some new features and improvements, such as Scala 2.12 support, one-time S3 file sinks, integration of complex event processing with streaming SQL, and more.

Scala 2.12 support in Apache Flink (Flink-7811)

Apache Flink 1.7.0 is the first release to fully support Scala 2.12. This allows users to write Flink applications using newer Scala versions and take advantage of the Scala 2.12 ecosystem.

State Evolution (Flink-9376)

In many cases, long-running Flink applications need to evolve over their life cycle due to changing requirements. Changing user state without losing the current application progress state is a key requirement for application evolution. With Flink 1.7.0, the community adds state evolution, allowing you the flexibility to adjust the user state mode for long-running applications while maintaining compatibility with previous savepoints. With state evolution, columns can be added or removed from the state pattern to change the business functions captured by the application after it is deployed. State pattern evolution can now immediately use classes as user states when using Avro generation, which means that the architecture of the country can be evolved according to Avro specifications. While the Avro type is the only built-in type in Flink 1.7 that supports schema evolution, the community continues to work on further extending support for other types in future Flink releases.

Streaming SQL Support (Flink-6935)

This is an important addition to Apache Flink 1.7.0, which provides initial support for the MATCH RECOGNIZE standard for Flink SQL. This feature combines complex Event processing (CEP) and SQL to make it easy to pattern match data flows, enabling a whole new set of use cases. This feature is currently in beta, so we welcome any feedback and suggestions from the community.

Temporal tables and time joins in Streaming SQL (Flink-9712)

Temporal tables are a new concept in Apache Flink that provide a (parameterized) view of a table’s change history and return its contents at a specific point in time. For example, we can use tables with historical currency rates. Over time, this table has grown/developed and new updated rates have been added. A temporal table is a view that returns the actual state of these exchange rates to any given point in time. With such a table, order flows in different currencies can be converted to common currency at the correct exchange rate. Time joins allow for in-memory and compute efficient streaming data joins using constantly changing/updating tables.

Other features of Streaming SQL

In addition to the main features mentioned above, Flink’s Table&SQL API has been extended to more usage cases. The following built-in functions were added to the API: TO_BASE64, LOG2, LTRIM, REPEAT, REPLACE, COSH, SINH, TANH SQL Client now supports defining views in environment files and CLI sessions. In addition, basic SQL statement completion has been added to the CLI. The community added an Elasticsearch 6 table sink that allows you to store updated results of dynamic tables.

Kafka 2.0 Connector (Flink-10598)

Apache Flink 1.7.0 continues to add more connectors, making it easier to interact with more external systems. In this release, the community added the Kafka 2.0 connector, which allows Kafka 2.0 to be read and written with a one-time guarantee.

Local Recovery (Flink-9635)

Apache Flink 1.7.0 completes local recovery by extending Flink’s scheduling to take previous deployment locations into account when restoring. If local restore is enabled, Flink keeps the machine on which the local copy of the latest checkpoint task is running. By scheduling the task to its previous location, Flink minimizes network traffic to restore the state by reading the checkpoint state from the local disk. This feature greatly improves recovery speed.

2.2 Flink 1.8.0 new features

Flink 1.8.0 introduces state cleaning

Flink 1.8 introduces continuous cleaning of old data on the RocksDB state back end (Flink-10471) and heap state back end (Flink-10473). This means that old data is constantly being cleaned up (depending on the TTL Settings).

Add and remove some Table apis

1) Introduction of a new CSV format (Flink-9964)

This release introduces a new format for RFC 4180-compliant CSV files. New descriptors can use org. Apache. Flink. Table. Descriptors. Csv. Currently, it can only be used with Kafka. Old descriptor org. Apache. Flink. Table. Descriptors. OldCsv connectors used in file system.

2) Deprecation of static generator methods in TableEnvironment (Flink-11445)

To API is separated from actual implementation, TableEnvironment getTableEnvironment () is not recommended to use a static method. Now, it is recommended to use Batch/StreamTableEnvironment. The create ().

3) Changes in Maven module for table API (Flink-11064)

Users who previously had a Flink-table dependency need to update their flink-table-planner dependency, as well as the correct flink-table-api-* dependency, depending on whether Java or Scala is used: Flink-table-api-java-bridge or flink-table-api-Scala bridge.

Kafka Connector modifications

Introduce directly accessible ConsumerRecord new KafkaDeserializationSchema (FLINK – 8354), For FlinkKafkaConsumers launched a new KafkaDeserializationSchema, can directly access KafkaConsumerRecord.

Third, Flink SQL programming model

The basic building blocks of Flink’s programming model are streams and transformations, each data stream starts from one or more sources and ends at one or more sinks.

 

I believe you are already very familiar with the above figure, of course, based on Flink SQL Flink program is also inseparable from reading the original data, calculation logic and writing the calculation result data three parts.

A complete Flink SQL program consists of the following three parts:

Source Operator: Soruce Operator is an abstraction of external data sources. At present, Apache Flink has built many common data Source implementations, such as MySQL and Kafka.

The Transformation Operators: Operator operations are mainly performed such as query and aggregation operations. Currently, Flink SQL supports most operations supported by traditional databases such as Union, Join, Projection, Difference, Intersection and window.

Sink Operator: The Sink Operator is the abstraction of the external result table. At present, Apache Flink also has many common result table abstractions built-in, such as Kafka Sink, etc

Let’s take a look at the differences between traditional DEVELOPMENT based on the DataSet/DataStream API and SQL development using a classic WordCount program.

DataStream/DataSetAPI

Flink SQL

SELECT word, COUNT(word) FROM table GROUP BY word;Copy the code

We can already intuitively understand the speed and convenience of SQL development.

Flink SQL syntax and operators

4.1 Syntax supported by Flink SQL

The semantic design of Flink SQL core operator refers to anSI-SQL standards such as 1992 and 2011. Flink uses Apache Calcite to parse SQL, and Calcite supports standard ANSI SQL.

So what SQL syntax does Flink itself support?

insert: INSERT INTO tableReference query query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER  BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS  } ONLY] orderItem: expression [ ASC | DESC ] select: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } projectItem: expression [ [ AS ] columnAlias ] | tableAlias . * tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] joinCondition: ON booleanExpression | USING '(' column [, column ]* ')' tableReference: tablePrimary [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' | UNNEST '(' expression ')' values: VALUES expression [, expression ]* groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' windowRef: windowName | windowSpec windowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression {PRECEDING} | ROWS numericExpression {PRECEDING} ] ')'Copy the code

The syntax support for Flink SQL has also shown that Flink SQL supports operators. Next, we introduce the most common operator semantics in Flink SQL.

4.2 Flink Common SQL operators

SELECT

SELECT is used to SELECT data from the DataSet/DataStream and filter out some columns.

Example:

SELECT * FROM Table; SELECT name, age FROM Table; // Select name and age from tableCopy the code

At the same time, functions and aliases can be used in SELECT statements, such as WordCount we mentioned above:

SELECT word, COUNT(word) FROM table GROUP BY word;
Copy the code

WHERE

WHERE is used to filter data from a data set/stream and is used in conjunction with SELECT to split relationships horizontally based on certain criteria, that is, to SELECT records that meet the criteria.

Example:

SELECT name, age FROM Table where name LIKE '% xiaoming %'; SELECT * FROM Table WHERE age = 20;Copy the code

Flink SQL also supports the combination of =, <, >, <>, >=, <=, AND, OR expressions in the WHERE condition. Finally, the data that meets the filtering condition will be selected. And WHERE can be used together with IN and NOT IN. Here’s an example of being responsible:

SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
Copy the code

DISTINCT

DISTINCT is used to de-duplicate data sets/streams based on the results of the SELECT.

Example:

SELECT DISTINCT name FROM Table;
Copy the code

For streaming queries, the number of states required to calculate the query results can grow indefinitely, and users need to control the State range of the query themselves to prevent excessive State.

GROUP BY

GROUP BY is used to GROUP data. For example, we need to calculate the total score of each student in the score sheet.

SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
Copy the code

The UNION and UNION ALL

UNION is used to join two result sets, requiring the fields of the two result sets to be exactly the same, including the field type and field order. Unlike a UNION ALL, a UNION deduplicates the result data.

Example:

SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;Copy the code

JOIN

JOIN is used to JOIN data from two tables to form a result table. The JOIN types supported by Flink include:

JOIN – INNER JOIN

LEFT JOIN – LEFT OUTER JOIN

RIGHT JOIN – RIGHT OUTER JOIN

FULL JOIN – FULL OUTER JOIN

The semantics of the JOIN here are the same as the JOIN semantics we use in relational databases.

Example:

SELECT * FROM Orders INNER JOIN Product ON orders.productid = [product.id](http://product.id/) SELECT * FROM Orders INNER JOIN Product ON orders.productid = [product.id](http://product.id/)Copy the code

The difference between LEFT JOIN and JOIN is that when the RIGHT table has no data to JOIN with the LEFT table, the corresponding field on the RIGHT will be NULL output, and the RIGHT JOIN is equivalent to the LEFT JOIN two tables to interact with each other. FULL JOIN is equivalent to UNION ALL after RIGHT JOIN and LEFT JOIN.

Example:

SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/) SELECT * FROM Orders RIGHT  JOIN Product ON Orders.productId = [Product.id](http://product.id/) SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)Copy the code

Group Window

According to the differences in Window data division, Apache Flink has the following three kinds of Bounded Windows:

Tumble: Window data has a fixed size and does not add data to it.

Hop, sliding window, window data has fixed size, and there is a fixed window reconstruction frequency, window data has superposition;

Session: Window data does not have a fixed size. Windows are divided according to the activity level of window data. Window data is not superimposed.

Tumble Window

The Tumble scroll window has a fixed size and does not overlap window data. The semantics are as follows:

 

The syntax for Tumble scroll Windows is as follows:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
Copy the code

Among them:

[GK] determines whether aggregation by field is required;

TUMBLE_START indicates the window start time;

TUMBLE_END indicates the window end time.

TimeCol is a time field in the flow table.

Size indicates the window size, such as seconds, minutes, hours, and days.

For example, if we want to calculate the daily order volume of each person, we can aggregate the orders by user:

SELECT user, TUMBLE_START(Rowtime, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(Rowtime, The INTERVAL '1' DAY), the user;Copy the code

Hop Window

The Hop slide window is similar to the scroll window in that the window has a fixed size. Unlike the scroll window, the slide window can control the new frequency of the slide window through the slide parameter. Therefore, when the slide value is smaller than the value of the window size, multiple sliding Windows will overlap, with specific semantics as follows:

 

The syntax of the Hop sliding window is as follows:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
Copy the code

Each field means something similar to the Tumble window:

  • [GK] determines whether aggregation by field is required;
  • HOP_START indicates the window start time.
  • HOP_END indicates the window end time.
  • TimeCol indicates the time field in the flow table.
  • Slide indicates the size of each window slide;
  • Size indicates the size of the entire window, such as seconds, minutes, hours, and days.

For example, we need to count the sales of each item over the past 24 hours every hour

SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
Copy the code

Session Window

Session time Windows have no fixed duration, but they are bounded by interval inactivity, which means that if no event occurs during the defined interval, the session window closes.

 

Seeeion The syntax for a session window is as follows:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
Copy the code
  • Gk] determines whether aggregation by field is required;
  • SESSION_START indicates the window start time.
  • SESSION_END indicates the window end time.
  • TimeCol indicates the time field in the flow table.
  • Gap indicates the length of the window data inactive cycle.

For example, we need to calculate the number of orders within 12 hours of each user’s access time:

SELECT user, SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(Rowtime, INTERVAL '12' HOUR), userCopy the code

Flink SQL built-in functions

Flink provides a large number of built-in functions for us to use directly. The commonly used built-in functions are classified as follows:

  • The comparison function
  • Logic function
  • The arithmetic function
  • String handling function
  • Time function

We’ll look at each of these functions with examples.

5.1 Comparison Function

 

5.2 Logical Functions

 

5.3 Arithmetic Function

 

In addition to the functions in the table above, Flink SQL supports a rich variety of function calculations.

5.4 String Handling functions

 

5.5 Time Function

 

Six, Flink SQL real application

Above we respectively introduced Flink SQL background, new features, programming model and common operators, this part we will simulate a real case for you to use Flink SQL to provide a complete Demo.

I’m sure there are a lot of NBA fans out there. Let’s say we had a statistical record of the scoring leaders each season, including seasons, players, games played, starts, minutes, assists, steals, blocks, points, etc. Now we’re going to count the three players who won scoring titles the most.

The original data is stored in the score.csv file as follows:

17-18, James harden, 72,72,35.4, 8.8, 1.8, 0.7, 30.4 16-17, brooke Russell - Wisconsin, 81,81,34.6, 10.4, 1.6, 0.4, 31.6 15 to 16, Stephen curry, 79,79,34.2, 6.7, 2.1, 0.2, 30.1, 14 to 15, brooke Russell - Wisconsin, 67,67,34.4, 8.6, 2.1, 0.2, 28.1 13 and 14, Kevin durant, 81,81,38.5, 5.5, 1.3, 0.7, 32 12-13, carmelo - Anthony, 67,67,37,2.6, 0.8, 0.5, 28.7, 11 and 12, Kevin durant, 66,66,38.6, 3.5, 1.3, 1.2, 28 10-11, Kevin durant, 78,78,38.9, 2.7, 1.1, 1,27.7 09-10, Kevin durant, 82,82,39.5, 2.8, 1.4, 1,30.1 08-09, dwyane wade, 79,79,38.6, 7.5, 2.2, 1.3, 30.2 07, 08, lebron James, 75,74,40.4, 7.2, 1.8, 1.1, 30 06-07, kobe Bryant, 77,77,40.8, 5.4, 1.4, 0.5, 31.6 05-06, kobe Bryant, 80,80,41,4.5, 1.8, 0.4, 35.4 04-05, Allen iverson, 75,75,42.3, 7.9, 2.4, 0.1, 30.7 03-04, Tracy McGrady, 67,67,39.9, 5.5, 1.4, 0.6, and Tracy McGrady, 02, 03,74,39.4 75, 5.5, 1.7, 0.8, 32.1 01, 02, Allen iverson, 60,59,43.7, 5.5, 2.8, 0.2, 31.4, 00-01, Allen iverson, 71,71,42,4.6, 2.5, 0.3, 31.1 99-00, shaquille o 'neal, 79,79,40,3.8, 0.5, 3,29.7 98-99, Allen iverson, 48,48,41.5, 4.6, 2.3, 0.1, 26.8, 97-98, Michael Jordan, 82,82,38.8, 3.5, 1.7, 0.5, 28.7 96-97, Michael Jordan, 82,82,37.9, 4.3, 1.7, 0.5, 29.6, 95-96, Michael Jordan, 82,82,37.7, 4.3, 2.2, 0.5, 30.4 94-95, shaquille o 'neal, 79,79,37,2.7, 0.9, 2.4, 29.3, 93-94, David Robinson, 80,80,40.5, 4.8, 1.7, 3.3, 29.8 92-93, Michael Jordan, 78,78,39.3, 5.5, 2.8, 0.8, 32.6, 91-92, Michael Jordan, 80,80,38.8, 6.1, 2.3, 0.9, 30.1, 90-91, Michael Jordan, 82,82,37,5.5, 2.7, 1,31.5 89-90, Michael Jordan, 82,82,39,6.3, 2.8, 0.7, 33.6, 88-89, Michael Jordan, 81,81,40.2, 8,2.9, 0.8, 32.5, 87-88, Michael Jordan, 82,82,40.4, 5.9, 3.2, 1.6, 35 86-87, Michael Jordan, 82,82,40,4.6, 2.9, 1.5, 37.1, 85-86, dominique Wilkins, 78,78,39.1, 2.6, 1.8, 0.6, 30.3 84-85, Bernard - gold, 55,55,37.5, 3.7, 1.3, 0.3, 32.9, 83-84, Adrian - dent, 79,79,37.8, 3.9, 0.8, 0.1, 30.6 82-83, alex, - Bryan English,82,36.4 82, 4.8, 1.4, 1.5, 28.4, 81-82, George lattice, 79,79,35.7, 2.4, 1,0.6, 32.3Copy the code

First we need to create a project that has the following dependencies in Maven:

< the properties > < project. Build. SourceEncoding > utf-8 < / project. Build. SourceEncoding > < flink. Version > 1.7.1 < / flink version > < slf4j version > 1.7.7 < / slf4j version > < log4j. Version > 1.2.17 < / log4j version > < scala. Binary. Version > 2.11 < / scala. Binary. Version > < / properties > < dependencies > <! -- Apache Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - table_2. 11 < / artifactId > < version > 1.7.1 < / version > </dependency> <dependency> <groupId>org.apache.flink</groupId> < artifactId > flink - streaming - scala_ ${scala. Binary. Version} < / artifactId > < version > 1.7.1 < / version > < / dependency > <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency>Copy the code

First, create the context:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
Copy the code

Second, read score.csv and enter as source:

DataSet<String> input = env.readTextFile("score.csv"); DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); }}); Public static class PlayerData {/** * season, players, starts, minutes, assists, steals, blocks, scores */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; }}Copy the code

Step 3: register source data as a table:

Table topScore = tableEnv.fromDataSet(topInput);
tableEnv.registerTable("score", topScore);
Copy the code

The fourth step, the core processing logic SQL writing:

Table queryResult = tableEnv.sqlQuery("
select player, 
count(season) as num 
FROM score 
GROUP BY player 
ORDER BY num desc 
LIMIT 3
");
Copy the code

Step 5 output results:

DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();
Copy the code

We run the program directly and observe the output:

. 16:28:06, 162 INFO org. Apache. Flink. Runtime. The dispatcher. DispatcherRestEndpoint - Shut down complete. 16:28:06, 162 INFO Org. Apache. Flink. Runtime. Taskexecutor. JobLeaderService - Stop job leader service. 16:28:06, 164 INFO Org. Apache. Flink. Runtime. Taskexecutor. Taskexecutor - Stopped taskexecutor akka: / / flink/user/taskmanager_2 16:28:06, 166 The INFO akka. Remote. RemoteActorRefProvider $RemotingTerminator - Shutting down remote daemon. 16:28:06, 166 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote daemon shut down; Proceeding with flushing remote transports. 16:28:06, 169 INFO akka. Remote. RemoteActorRefProvider $RemotingTerminator - Remoting shut down. 16:28:06, 177 INFO org. Apache. Flink. Runtime. RPC. Akka. AkkaRpcService - Stopping akka RPC service. 16:28:06, 187 INFO org. Apache. Flink. Runtime. Blob. PermanentBlobCache - Shutting down a blob cache 16:28:06, 187 INFO Org. Apache. Flink. Runtime. Blob. TransientBlobCache - Shutting down a blob cache 16:28:06, 188 INFO Org. Apache. Flink. Runtime. Blob. BlobServer - Stopped blob server at 0.0.0.0:51703 16:28:06, 188 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. Michael Jordan :10 Kevin Durant :4 Allen Iverson :4Copy the code

We see that the console has output:

 

The complete code is as follows:

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; public class TableSQL { public static void main(String[] args) throws Exception{ //1\. For context table environment ExecutionEnvironment env = ExecutionEnvironment. GetExecutionEnvironment (); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); CSV DataSet<String> INPUT = env.readtextFile ("score. CSV "); //2\. input.print(); DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() { @Override public PlayerData map(String s) throws Exception { String[] split = s.split(","); return new PlayerData(String.valueOf(split[0]), String.valueOf(split[1]), String.valueOf(split[2]), Integer.valueOf(split[3]), Double.valueOf(split[4]), Double.valueOf(split[5]), Double.valueOf(split[6]), Double.valueOf(split[7]), Double.valueOf(split[8]) ); }}); Table topScore = tableenv.fromdataset (topInput); tableEnv.registerTable("score", topScore); Select player, count(season) as num from score group by player order by num desc; Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3"); DataSet<Result> Result = tableenv. toDataSet(queryResult, result.class); result.print(); } public static class PlayerData {** ** season, players, starts, minutes, assists, steals, blocks, scores */ public String season; public String player; public String play_num; public Integer first_court; public Double time; public Double assists; public Double steals; public Double blocks; public Double scores; public PlayerData() { super(); } public PlayerData(String season, String player, String play_num, Integer first_court, Double time, Double assists, Double steals, Double blocks, Double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } } public static class Result { public String player; public Long num; public Result() { super(); } public Result(String player, Long num) { this.player = player; this.num = num; } @Override public String toString() { return player + ":" + num; / /}}}Copy the code

Of course, we can also define a Sink to output the result to a file, for example:

        TableSink sink = new CsvTableSink("/home/result.csv", ",");
String[] fieldNames = {"name", "num"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
        tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
        sqlQuery.insertInto("result");
        env.execute();
Copy the code

Result. CSV is generated in /home.

Michael Jordan,10

Kevin Durant,4

Allen Iverson,4

Seven,

This paper introduces the background of Flink SQL generation, most of the core functions of Flink SQL, and respectively introduced the programming model of Flink SQL and common operators and built-in functions. Finally, a complete example shows how to write a Flink SQL program. The ease of use of Flink SQL greatly reduces the threshold of Flink programming, is we must master the use of Flink to solve the flow of computing problems of the sharp-edged weapon!