Author: Fu Dian

background

Python custom functions are one of the most important functions of PyFlink Table API, which allows users to use custom functions developed in Python language in PyFlink Table API, greatly expanding the scope of use of Python Table API.

At present, Python’s custom functions are well developed, with support for various types of custom functions, such as SCALAR Function (UDF), TABLE Function (UDTF), and Aggregate Function (UDAF). Table Aggregate Function (UDTAF), Panda UDF, and Pandas UDAF Next, we’ll take a closer look at how to use Python custom functions in the PyFlink Table API job.

Python custom function basics

According to the number of lines of input/output data, custom functions in Flink Table API & SQL can be divided into the following categories:

Custom function Single Row Input Multiple Row Input
Single Row Output ScalarFunction AggregateFunction
Multiple Row Output TableFunction TableAggregateFunction

PyFlink provides support for all four types of custom functions, so let’s take a look at how each type of custom function is used.

Python UDF

The Python UDF, or Python ScalarFunction, produces only one piece of output data for each piece of input data. For example, the following example shows a variety of ways to define a Python UDF named “sub_string” :

From Pyflink.table. udf import udf, FunctionContext, ScalarFunction from Pyflink.table import DataTypes @udf(result_type= datatypes.string ()) def sub_string(s: STR, begin: int, end: int): return s[begin:end] Sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING()) def __call__(self, s: str, begin: int, end: int): Return s[begin:end] sub_string = udf(SubString(), result_type= datatypes.string ()) int, end: int): return s[begin:end] sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), Result_type = datatypes.string () class SubString(ScalarFunction): def open(self, function_context: FunctionContext): pass def eval(self, s: str, begin: int, end: int): return s[begin:end] sub_string = udf(SubString(), result_type=DataTypes.STRING())Copy the code

Description:

  • Declare that this is a Scalar Function with a “UDF” decorator;
  • The result type of Scalar Function needs to be declared with the result_type parameter in the decorator.
  • Definition of A Python UDF by inheriting ScalarFunction has the following uses:
    • An open method is defined in the base class UserDefinedFunction of ScalarFunction. This method is executed only once during job initialization, so you can use this method to do initialization tasks such as loading machine learning models, connecting to external services, and so on.
    • In addition, you can register and use metrics using the function_context parameter in the open method.
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
table.select(sub_string(table.a, 1, 3))
Copy the code

Python UDTF

The Python UDTF, or Python TableFunction, can produce zero, one, or more output data for each input, and a single output data can contain multiple columns. For example, we define a Python UDF named split that splits the input string into two strings using the specified string delimiter:

from pyflink.table.udf import udtf
from pyflink.table import DataTypes

@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str, sep: str):
    splits = s.split(sep)
    yield splits[0], splits[1]
Copy the code

Description:

  • Declare this to be a table function with a decorator named “UDTF”;
  • The result type of the table function needs to be declared with the result_types parameter in the decorator. Since each table function output can contain multiple columns, result_types needs to specify the types of all output columns.
  • The Python UDTF definition also supports many of the definitions listed in the Python UDF section, only one of which is shown here.

Once you have defined the Python UDTF, you can use it directly in the Python Table API:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])

table.join_lateral(split(table.a, '|').alias("c1, c2"))
table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))
Copy the code

Python UDAF

Python UDAF, the Python AggregateFunction. Python UDAF is used to aggregate a set of data, such as multiple data under the same window, or multiple data under the same key. The Python AggregateFunction generates a single output for the same set of input data. For example, a Python UDAF with the name weighted_avg is defined:

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udaf


class WeightedAvg(AggregateFunction):

    def create_accumulator(self):
        # Row(sum, count)
        return Row(0, 0)

    def get_value(self, accumulator: Row) -> float:
        if accumulator[1] == 0:
            return 0
        else:
            return accumulator[0] / accumulator[1]

    def accumulate(self, accumulator: Row, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight

    def retract(self, accumulator: Row, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight


weighted_avg = udaf(f=WeightedAvg(),
                    result_type=DataTypes.DOUBLE(),
                    accumulator_type=DataTypes.ROW([
                        DataTypes.FIELD("f0", DataTypes.BIGINT()),
                        DataTypes.FIELD("f1", DataTypes.BIGINT())]))
Copy the code

Description:

  • You need to declare that this is an aggregate function with a decorator named “UDaf”,
  • Result_type and accumulator_type parameters in the decorator need to be respectively used to declare the result type of aggregate function and accumulator type.
  • Create_accumulator, get_value, and accumulate must be defined. Retract can be defined as required. For details, see Flink official document [1]. Note that the create_accumulator, get_value and accumulate methods must be defined. The Python UDAF can only be defined by inheriting from the AggregateFunction (Pandas UDAF has no restrictions on this).

Once Python UDAF is defined, you can use it in the Python Table API like this:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],
                        ["value", "count", "name"])

t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))
Copy the code

Python UDTAF

Python UDTAF stands for Python TableAggregateFunction. Python UDTAF is used to aggregate a set of data, such as multiple pieces of data under the same window, or multiple pieces of data under the same key. Python UDTAF can produce zero, one, or even more output data.

The following example defines a Python UDTAF named Top2:

from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import udtaf, TableAggregateFunction class Top2(TableAggregateFunction): def create_accumulator(self): Return [None, None] def accumulate(self, accumulator, input_row): if input_row[0] is not None: If accumulator[0] is None or accumulator[0] > Accumulator [0]: Accumulator [1] = Accumulator [0] Accumulator [0] = input_row[0] accumulator[1]: accumulator[1] = input_row[0] def emit_value(self, accumulator): yield Row(accumulator[0]) if accumulator[1] is not None: yield Row(accumulator[1]) top2 = udtaf(f=Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))Copy the code

Description:

  • The Python UDTAF feature is a new feature supported since Flink 1.13;
  • Create_accumulator, accumulate and emit_value must be defined. Retract and merge are supported in TableAggregateFunction. Details can be found in the official Flink documentation [2].

Once you have defined the Python UDTAF, you can use it in the Python Table API like this:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (2, 'Hi', 'Hello'), (7, 'Hi', 'Hello')], ['a', 'b', 'c']) t_env.execute_sql(""" CREATE TABLE my_sink ( word VARCHAR, `sum` BIGINT ) WITH ( 'connector' = 'print' ) """) result = t.group_by(t.b).flat_aggregate(top2).select("b, A ").execute_insert("my_sink") # 1) Wait for the job to end for local execution. 2) This method needs to be removed when jobs are submitted to remote clusters via detach mode, such as YARN/Standalone/K8s, etc. Result.wait ()Copy the code

When you execute the above program, you can see output similar to the following:

11> +I[Hi, 7]
10> +I[Hi2, 5]
11> +I[Hi, 3]
Copy the code

Description:

  • Python UDTAF can only be used in the Table API, not in SQL statements;
  • The result of flat_aggregate contains the original grouping column and UDTAF (top 2) output, so column “B” can be accessed in select.

Advanced Python custom functions

Use Python custom functions in a pure SQL job

The CREATE FUNCTION statement in Flink SQL supports registering Python custom functions, so in addition to using Python custom functions in PyFlink Table API jobs, You can also use Python custom functions in a pure SQL job.

CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON

CREATE TABLE source (
  a VARCHAR
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE sink (
  a VARCHAR
) WITH (
  'connector' = 'print'
);

INSERT INTO sink
SELECT sub_string(a, 1, 3)
FROM source;
Copy the code

Use Python custom functions in a Java job

Users can register Python custom functions through DDL, which means that users can also use Python custom functions in Java Table API jobs, such as:

TableEnvironment tEnv = TableEnvironment.create(
            EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON");
tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));
tEnv.executeSql("SELECT sub_string(a) FROM source").collect();
Copy the code

See PyFlink Playground [3] for a detailed example.

An important use of this feature is to mix Java operators with Python operators. You can use The Java language to develop most of the job logic. When some parts of the job logic must be written in Python, you can use the above method to call a custom function written in Python.

If DataStream is used, you can convert DataStream to a Table and then call a custom function written in Python.

Dependency management

Access to third-party Python libraries in Python custom functions is a very common requirement, and in machine learning prediction scenarios, users may also need to load a machine learning model in Python custom functions. When we perform PyFlink jobs in local mode, we can install third-party Python libraries in our local Python environment or download machine learning models locally. However, when we submit PyFlink jobs for remote execution, there may be some problems:

  • How third-party Python libraries are accessed by Python custom functions. Different jobs have different requirements for Python library versions. Pre-installing third-party Python libraries into the Python environment of the cluster only applies to installing some common dependencies, but does not solve different jobs’ requirements for Python dependencies.
  • Machine learning models, or data files, are distributed to cluster nodes and ultimately accessed by Python custom functions.

Dependencies can also include JAR packages, and PyFlink provides a variety of solutions for various dependencies:

Depend on the type The solution Purpose to describe Example (Flink run)
Flink run parameters Configuration items API
Job entry file -py / –python There is no There is no The entry file for the specified job can only be a.py file -py file:///path/to/table_api_demo.py
Job entry Module -pym / –pyModule There is no There is no An entry module for a job, which functions like –python and is more generic than –python when the job’s Python file is a zip package and cannot be specified by –python -pym table_api_demo-pyfs file:///path/to/table_api_demo.py
Python tripartite library files -pyfs / –pyFiles python.files add_python_file Specify one or more Python files (.py/.zip/.whl, etc., separated by commas) that are placed in the Python process’s PYTHONPATH during job execution and can be accessed directly from Python custom functions -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
Archive file -pyarch /–pyArchives python.archives add_python_archive Specify one or more archive files (separated by commas) that, when the job is executed, will be decompressed and placed in the Python process’s working directory, accessible relative to the path -pyarchfile:///path/to/venv.zip
Python interpreter path -pyexec / –pyExecutable python.executable set_python_executable Specifies the Python interpreter path to use when the job is executed -pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3
The requirements document -pyreq / –pyRequirements python.requirements set_python_requirements Specify the Requirements file that defines the Python tripartite library dependencies for the job, which are installed via PIP based on the requirements content when the job is executed -pyreq requirements.txt
The JAR package There is no Pipeline classpaths, pipeline. Jars There is no special API and can be set through the set_string method of configuration Specify the JAR package that the job depends on, usually used to specify the Connector JAR package There is no

Description:

  • Note that the Python UDF implementation file also needs to be uploaded as a dependent file when the job is executed.
  • You can specify that jobs are to be executed using the uploaded Python virtual environment by combining the “archive file” and “Python interpreter path”, for example:
Table_env.add_python_archive ("/path/to/py_env.zip") # specify python included in py_env.zip to execute user-defined functions, The relative path must be specified for table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")Copy the code
  • It is recommended that you use Conda to build Python virtual environments. The Python virtual environment constructed by Conda contains most of the underlying libraries required to execute Python, which greatly avoids cluster execution when the local environment is different from the cluster environment. Lack of various underlying dependency libraries. For details on how to build Python virtual environment using Conda, please refer to the chapter “Using Python Tripartite Package” in aliyun VVP documentation [4]
  • Some Python tripartite libraries require installation to use, that is, they are not “downloaded and referenced directly in PYTHONPATH”. There are two solutions for this type of Python tripartite library:
    • Install it in a Python virtual environment, and specify that the job runs using the Python virtual environment you built;
    • Find a machine (or Docker) that is the same as the clustered environment, install the required Python tripartite libraries, and then package the installation files. This approach is relatively small compared to the Python virtual environment. For details, please refer to the section “Using a Custom Python virtual environment” in aliyun VVP documentation [5].

debugging

PyFlink supports remote debugging of Python custom functions, as described in the “Remote Debugging” section of the article “How to Develop PyFlink API Jobs from 0 to 1” [6].

In addition, you can print logs in Python custom functions via logging. Note that the log output needs to be viewed in the TaskManager log file, not the current console. For details on how to use it, see the “Custom Logging” section of “How to Develop PyFlink API Jobs from 0 to 1” [6]. It is important to note that TM logs are located in the PyFlink installation directory when running a job in local mode, for example:

>>> import pyflink

[‘/Users/dianfu venv/pyflink – usecases/lib/python3.8 / site – packages/pyflink ‘]

tuning

The performance of Python custom functions depends largely on the implementation of the Python custom functions themselves. If you encounter performance problems, you first need to find ways to optimize the implementation of Python custom functions as much as possible.

In addition, the performance of Python custom functions is affected by the following parameters.

parameter instructions
python.fn-execution.bundle.size Python custom functions execute asynchronously. During job execution, Java operators send data asynchronously to Python processes for processing. Java operators cache data before sending it to the Python process, and then send it to the Python process after a certain threshold is reached. The python.fn-execution.bundle.size parameter can be used to control the maximum number of pieces of data that can be cached. The default value is 100000.
python.fn-execution.bundle.time Used to control the maximum cache time for data. When the number of cached data items reaches the threshold defined by python.fn-execution.bundle.size or the cache time reaches the threshold defined by python.fn-execution.bundle.time, the cache data is computed. The default value is 1000 milliseconds.
python.fn-execution.arrow.batch.size Used to control the maximum number of arrow batch data that can be contained when Pandas UDF is used. The default value is 10000. That python. Fn – execution. Arrow. Batch. The size parameter value is not greater than python. The fn – execution. Bundle. The size parameter values.

Description:

  • If the value of the preceding parameter is too large, too much data needs to be processed at the checkpoint. As a result, the checkpoint duration is too long, or even the checkpoint fails. If the checkpoint time of a job is long, you can reduce the value of the preceding parameter.

Q&A

1) The actual return value type of the Python custom function is inconsistent with the declared type in result_TYPE, which will cause the Java operator to report an error when deserializing the execution result of the Python custom function, and the error stack is similar:

Caused by: Java. IO. EOFException at Java. IO. A DataInputStream. ReadInt (392). A DataInputStream Java: ~ [? : 1.8.0 comes with _261] the at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~ [flink - table - blink_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~ [flink - table - blink_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~ [flink - table - blink_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java : 124) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0] the at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScala RFunctionOperator. Java: 84) ~ [flink - python_2. 11-1.12.0. Jar: 1.12.0]Copy the code

2) Instantiate an object in the Init method of a Python custom function that cannot be serialized by Cloudpickle.

When submitting a job, PyFlink will serialize Python custom functions through Cloudpickle. If the Python custom functions contain objects that cannot be serialized by Cloudpickle, PyFlink will encounter a similar error: TypeError: Can’t pickle XXX, which can be initialized in the open method.

Load a very large data file in the init method of a Python custom function.

Since PyFlink serializes Python custom functions through Cloudpickle at job submission, if a very large data file is loaded in the init method, the entire data file will be serialized as part of the Python custom function implementation. If the data file is very large, the job may fail to be executed. You can load the data file in the open method.

4) The Python environment of the client is inconsistent with the Python environment of the cluster, for example, the Python version and PyFlink version are inconsistent (large versions must be consistent, for example, 1.12.x).

conclusion

In this article, we will introduce the definition and use of various Python custom functions, as well as Python dependency management, debugging and tuning of Python custom functions. We hope to help users understand Python custom functions. We will continue the PyFlink series of articles to help PyFlink users gain insight into the various features, application scenarios, best practices, and more.

In addition, aliyun real-time computing ecology team is recruiting excellent big data talents (including internship and social recruitment) for a long time. Our work includes:

  • Real-time machine learning: Support real-time feature engineering and AI engine coordination in machine learning scenarios, build real-time machine learning standards based on Apache Flink and its ecology, promote the comprehensive real-time of search, recommendation, advertising, risk control and other scenarios;

  • Big data + AI integration: including programming language integration (PyFlink related work), execution engine integration (TF on Flink), workflow and management integration (Flink AI Flow).

If you are interested in open source, big data or AI, please send your resume to [email protected]

Reference links

[1] ci.apache.org/projects/fl…

[2] ci.apache.org/projects/fl…

[3] github.com/pyflink/pla…

[4] help.aliyun.com/document_de…

[5] help.aliyun.com/document_de…

[6] mp.weixin.qq.com/s/GyFTjQl6c…