• background
  • LookupableTableSource
  • Instance to explain
  • The source code parsing

background

In streaming computing, dimension table is a very common concept, which is generally used in SQL JOIN for data completion of streaming data. For example, our Source stream is the order data from the log, but we only record the ID of the order goods in the log, without other information. However, when we store the data in the data warehouse for data analysis, we need the name of commodity, price and other information. For this problem, we can complete the data by querying the dimension table during stream processing.

Dimension tables are generally stored in external storage, such as mysql, hbase, redis, etc. Today, we take mysql as an example to talk about the use of dimension tables in Flink.

LookupableTableSource

Flink provides a LookupableTableSource that can be used to implement dimension tables. This means that we can query external storage through key columns to get relevant information to complete the stream.

public interface LookupableTableSource<T> extends TableSource<T> {

	TableFunction<T> getLookupFunction(String[] lookupKeys);

	AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

	boolean isAsyncEnabled();
}
Copy the code

We see that LookupableTableSource has three methods

  • GetLookupFunction: Used to query dimension table data synchronizes and returns a TableFunction, so it’s essentially a user-defined UDTF.
  • GetAsyncLookupFunction: Used to query dimension table data asynchronously. This method returns an object
  • IsAsyncEnabled: Synchronous queries are performed by default. To enable asynchronous queries, this method must return true

In Flink, we see that there are four classes that implement this interface, JdbcTableSource, HBaseTableSource, CsvTableSource, HiveTableSource. Today, we mainly take JDBC as an example to talk about how to conduct dimensional table query.

Instance to explain

Let’s take a quick example. First we define the Stream source. We use the Datagen provided by Flink 1.11 to generate the data.

Let’s simulate the generation of user data. Here, only the id of the user is generated, ranging from 1 to 100.

CREATE TABLE datagen (
 userid int,
 proctime as PROCTIME()
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
)
Copy the code

Datagen can be used in the following ways:

Talk about DataGen Connector, a random data generator in Flink 1.11

Mysql > create table name;

CREATE TABLE dim_mysql (
  id int,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/test',
   'table-name' = 'userinfo',
   'username' = 'root',
   'password' = 'root'
)
Copy the code

Mysql > select * from ‘mysql’;

Finally, execute SQL query, flow table associated with dimension table:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id
Copy the code

The following is an example of the result:

3 > 53202-09-03 t07: when. 565, null, null > 3, 73202-09-03 t07: when. 566, null, null > 1, 14202-09-03 t07: when. 566, 14, aaddda 2 > 11202-09-03 t07: when. 566, null, null > 4, 8202-09-03 t07: when. 566, 8, name8 1 > 61202-09-03 t07: when. 567, null, null 3 > 12202-09-03 t07: when. 567, 12, aaa 2 > 99202-09-03 t07: when. 567, null, null > 4, 37202-09-03 t07: when. 2 > 568, null, null 13202-09-03 t07: when. 569, 13, aaddda 3 > 6202-09-03 t07: when. 568, 6, name6Copy the code

We see that for data that exists in the dimension table, it is associated, and for data that does not exist in the dimension table, it is displayed as NULL

For the complete code, see: github.com/zhangjun0x0…

The source code parsing

JdbcTableSource

Using JDBC as an example, let’s take a look at what flink does underneath.

The JdbcTableSource#isAsyncEnabled method returns false, that is, asynchronous queries are not supported, so enter the JdbcTableSource#getLookupFunction method.

	@Override
	public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
		final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
		return JdbcLookupFunction.builder()
				.setOptions(options)
				.setLookupOptions(lookupOptions)
				.setFieldTypes(rowTypeInfo.getFieldTypes())
				.setFieldNames(rowTypeInfo.getFieldNames())
				.setKeyNames(lookupKeys)
				.build();
	}
Copy the code

Finally, we construct a JdbcLookupFunction object,

  • Options are parameters to connect to JDBC, such as user, pass, and URL.
  • LookupOptions are parameters to a dimension table, such as cache size, timeout, and so on.
  • LookupKeys are the fields that you want to query dimension tables with association.

JdbcLookupFunction

JdbcLookupFunction JdbcLookupFunction JdbcLookupFunction JdbcLookupFunction is a subclass of TableFunction.

Flink tutorial – Custom functions for TableFunction

The core of a TableFunction is the eval method. In this method, the main work is to query data by concatenating multiple keys into SQL. The first query is the cache, and if there is data in the cache, it is directly returned. Next time you query the cache directly.

Why add a cache? By default is not open cache, each to a query, will be to send a request to query the dimension table, if the amount of data is large, is bound to create constant pressure to the storage of dimension table system, so the flink provides an LRU cache, query dimension tables, query cache first, cache didn’t go to the query system, However, if a data query frequency is high and is consistently hit, new data cannot be obtained. Therefore, the cache also needs to add a timeout period, after which the data will be forcibly deleted and the external system will query for new data.

How do you actually enable caching? Let’s look at the JdbcLookupFunction#open method

@Override public void open(FunctionContext context) throws Exception { try { establishConnectionAndStatement(); this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) .maximumSize(cacheMaxSize) .build(); } catch (SQLException sqe) { throw new IllegalArgumentException("open() failed.", sqe); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC driver class not found.", cnfe); }}Copy the code

That is, cacheMaxSize and cacheExpireMs need to be set at the same time, and a cache object cache is constructed to cache data. The corresponding DDL properties of these two parameters are lookup.cache.max-rows and lookup.cache.ttl

For the specific cache size and timeout Settings, users need to define their own conditions, and make a trade-off between data accuracy and system throughput.

For more information about dry goods, please pay attention to my official account [Big data Technology and Application combat]