Flink DataStream API is mainly divided into three parts: Source, Transformation, and Sink. Source is the data Source. Flink has many built-in data sources, such as Kafka, which is the most commonly used. Transformation is a specific Transformation operation, mainly user-defined logic for processing data, such as Map and FlatMap. Sink is the output of data, which can output the processed data to storage devices. Flink has many built-in sinks, such as Kafka, HDFS, etc. In addition to Flink’s built-in Source and Sink, users can achieve self-defined Source and Sink. Considering that the use of built-in Source and Sink is relatively simple and convenient, the use of built-in Source and Sink is not within the scope of discussion in this paper. This paper will start from the custom Source, then describe the use of some common operators in detail, and finally achieve a custom Sink.

The data source

Flink internally implements commonly used data sources, such as file-based, socket-based, aggregation-based and so on. If these cannot meet the requirements, users can customize data sources. MySQL will be used as an example to implement a customized data source. This data source will be used for all operations in this article, with the following code:

/ * * *@Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/4/14
 *  @Time: 17:34 * note: RichParallelSourceFunction and SourceContext must add generics * /
public class MysqlSource extends RichParallelSourceFunction<UserBehavior> {
    public Connection conn;
    public PreparedStatement pps;
    private String driver;
    private String url;
    private String user;
    private String pass;

    /** * This method is called only once at the beginning * this method is used to get connections **@param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize database connection parameters
        Properties properties = new Properties();
        URL fileUrl = TestProperties.class.getClassLoader().getResource("mysql.ini");
        FileInputStream inputStream = new FileInputStream(new File(fileUrl.toURI()));
        properties.load(inputStream);
        inputStream.close();
        driver = properties.getProperty("driver");
        url = properties.getProperty("url");
        user = properties.getProperty("user");
        pass = properties.getProperty("pass");
        // Get the data connection
        conn = getConection();
        String scanSQL = "SELECT * FROM user_behavior_log";
        pps = conn.prepareStatement(scanSQL);
    }

    @Override
    public void run(SourceContext<UserBehavior> ctx) throws Exception {
        ResultSet resultSet = pps.executeQuery();
        while (resultSet.next()) {
            ctx.collect(UserBehavior.of(
                    resultSet.getLong("user_id"),
                    resultSet.getLong("item_id"),
                    resultSet.getInt("cat_id"),
                    resultSet.getInt("merchant_id"),
                    resultSet.getInt("brand_id"),
                    resultSet.getString("action"),
                    resultSet.getString("gender"),
                    resultSet.getLong("timestamp"))); }}@Override
    public void cancel(a) {}/** * close the connection */
    @Override
    public void close(a) {
        if(pps ! =null) {
            try {
                pps.close();
            } catch(SQLException e) { e.printStackTrace(); }}if(conn ! =null) {
            try {
                conn.close();
            } catch(SQLException e) { e.printStackTrace(); }}}/** * get database connection **@return
     * @throws SQLException
     */
    public Connection getConection(a) throws IOException {
        Connection connnection = null;

        try {
            // Load the driver
            Class.forName(driver);
            // Get the connection
            connnection = DriverManager.getConnection(
                    url,
                    user,
                    pass);
        } catch (Exception e) {
            e.printStackTrace();
        }
        returnconnnection; }}Copy the code

First inherit RichParallelSourceFunction, implementation inheritance, the method of mainly includes the open () method, the run () method and the close method. The above

RichParallelSourceFunction is to support the setting of parallelism, more about the difference between the RichParallelSourceFunction and RichSourceFunction, the former allows users to set up more parallelism, The latter does not support through setParallelism set parallelism () method, the parallelism of the default is 1, otherwise will be submitted to the following error: bashException in thread “main” Java. Lang. IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.

In addition, RichParallelSourceFunction provides additional open () method and the close () method, if you need for a link when defining the Source, you can in the open () method to initialize, and then close () method of close links to resources, As for the difference between Rich***Function and ordinary Function, it will be explained in detail below. Here is an impression first. The configuration information in the above code is passed through the configuration file. Due to space constraints, I will put the code in this article on Github. See github address at the end of this article.

The basic transformation

Flink provides a large number of operators for users to use, common operators mainly include the following types, note: this article does not discuss about time and window based operators, these contents will be detailed in “Flink Time and window based operators”.

Note: The operation of this paper is based on the user-defined MySQL Source above, and the corresponding data interpretation is as follows:

userId;     / / user ID
itemId;     ID / / commodities
catId;      // Product category ID
merchantId; / / the seller ID
brandId;    / / brand ID
action;     // User behavior, including ("pv", "buy", "cart", "FAv ")
gender;     / / gender
timestamp;  // The timestamp when the behavior occurred, in seconds
Copy the code

Map

explain

DataStream → DataStream, input an element, return an element, as follows:

SingleOutputStreamOperator<String> userBehaviorMap = userBehavior.map(new RichMapFunction<UserBehavior, String>() {
            @Override
            public String map(UserBehavior value) throws Exception {
                String action = "";
                switch (value.action) {
                    case "pv":
                        action = "Browse";
                    case "cart":
                        action = "Purchased";
                    case "fav":
                        action = "Collection";
                    case "buy":
                        action = "Buy";
                }
                returnaction; }});Copy the code

Schematic diagram

A map operation that converts a raindrop shape into a corresponding circular shape

flatMap

explain

DataStream → DataStream, enter an element and return zero, one, or more elements. In fact, the flatMap operator can be regarded as a generalization of filter and Map, that is, it can realize these two operations. The corresponding FlatMapFunction of the flatMap operator defines the flatMap method, which can return zero, one or more events as a result by passing data to the Collector object. The operations are as follows:

SingleOutputStreamOperator<UserBehavior> userBehaviorflatMap = userBehavior.flatMap(new RichFlatMapFunction<UserBehavior, UserBehavior>() {
            @Override
            public void flatMap(UserBehavior value, Collector<UserBehavior> out) throws Exception {
                if (value.gender.equals("Female")) { out.collect(value); }}});Copy the code

Schematic diagram

Filter out the yellow raindrops, round the blue raindrops, and keep the green ones

Filter

explain

DataStream → DataStream: the filter operator determines the data. If the data meets the conditions, the data that returns true will be retained. Otherwise, the data will be filtered. As follows:

  SingleOutputStreamOperator<UserBehavior> userBehaviorFilter = userBehavior.filter(new RichFilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior value) throws Exception {
                return value.action.equals("buy");// Keep the purchase behavior data}});Copy the code

Schematic diagram

Filter out the red and green raindrops, keeping the blue ones.

keyBy

explain

DataStream→KeyedStream, which logically divides streams into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy () is implemented via hash partitioning. There are three ways to define a key: (1) use the position of a field, such as keyBy(1), which is used for tuple data types, such as tuple, using the position of the corresponding element of the tuple to define a key; (2) Field expressions for tuples, POJOs, and sample classes; (3) a keySelector, or keySelector, can extract keys from input events

SingleOutputStreamOperator<Tuple2<String, Integer>> userBehaviorkeyBy = userBehavior.map(new RichMapFunction<UserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
                return Tuple2.of(value.action.toString(), 1);
            }
        }).keyBy(0) // Scala tuple numbering starts at 1 and Java tuple numbering starts at 0
           .sum(1); // Scroll through the aggregation
Copy the code

Schematic diagram

KeyBy operations that partition events based on shapes

Reduce

explain

KeyedStream → DataStream performs rolling aggregation of data, combining the current element with the value returned by the last Reduce, and then returning a new value. Apply a ReduceFunction to a keyedStream, and every coming event will be aggregated with the result of current Reduce to generate a new DataStream. This operator will not change the data type, so the type of input stream and output stream will always be consistent.

SingleOutputStreamOperator<Tuple2<String, Integer>> userBehaviorReduce = userBehavior.map(new RichMapFunction<UserBehavior, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(UserBehavior value) throws Exception {
                return Tuple2.of(value.action.toString(), 1);
            }
        }).keyBy(0) // Scala tuple numbering starts at 1 and Java tuple numbering starts at 0
          .reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
              @Override
              public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                  return Tuple2.of(value1.f0,value1.f1 + value2.f1);// Scroll aggregate, which is similar to sum}});Copy the code

Schematic diagram

Aggregations

KeyedStream → DataStream, Aggregations(rolling Aggregations). The rolling Aggregations transform is applied to the KeyedStream to generate a DataStream containing the aggregate results (such as sum, min min). The scrolling aggregate transformation saves an aggregate result for each key value that flows through the operator, and updates the corresponding result value based on the previous result value and the current element value as new elements flow through the operator

  • Sum (): the sum of the specified fields that the rolling aggregate flows through the operator;
  • Min (): Scroll to calculate the minimum value of the specified field that flows through the operator
  • Max (): Scroll to calculate the maximum value of the specified field that flows through the operator
  • MinBy (): scroll to calculate the minimum value that has passed through the operator so far, and return the event corresponding to that value;
  • MaxBy (): scroll to calculate the maximum value that has passed through the operator so far, and return the event corresponding to that value;

union

explain

DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream* → DataStream The union operator does not rerun the data, and each input event is sent to the downstream operator.

userBehaviorkeyBy.union(userBehaviorReduce).print();// Join two streams together. Multiple streams (greater than 2) can be supported
Copy the code

Schematic diagram

connect

explain

DataStream,DataStream → ConnectedStreams, which combines the events of the two streams and returns a ConnectedStreams object, The two streams can have different data types. The ConnectedStreams object provides operators similar to the map() and flatMap() functions, such as CoMapFunction and CoFlatMapFunction, which represent the map() and flatMap operators respectively. Note that CoMapFunction or CoFlatMapFunction does not control the order of events when called. This operator will be called whenever events flow through the operator.

ConnectedStreams<UserBehavior, Tuple2<String, Integer>> behaviorConnectedStreams = userBehaviorFilter.connect(userBehaviorkeyBy);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> behaviorConnectedStreamsmap = behaviorConnectedStreams.map(new RichCoMapFunction<UserBehavior, Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map1(UserBehavior value1) throws Exception {
                return Tuple3.of("first", value1.action, 1);
            }
            @Override
            public Tuple3<String, String, Integer> map2(Tuple2<String, Integer> value2) throws Exception {
                return Tuple3.of("second", value2.f0, value2.f1); }});Copy the code

split

explain

DataStream → SplitStream, which splits a stream into two or more streams, as opposed to union. The segmented stream has the same data type as the input stream and can be routed to zero, one, or more output streams for each incoming event. Datastream.split () takes an OutputSelector function that defines the split rule, assigning streams that meet different criteria to an output named by the user.

 SplitStream<UserBehavior> userBehaviorSplitStream = userBehavior.split(new OutputSelector<UserBehavior>() {
            @Override
            public Iterable<String> select(UserBehavior value) {
                ArrayList<String> userBehaviors = new ArrayList<String>();
                if (value.action.equals("buy")) {
                    userBehaviors.add("buy");
                } else {
                    userBehaviors.add("other");
                }
                returnuserBehaviors; }}); userBehaviorSplitStream.select("buy").print();
Copy the code

Schematic diagram

Sink

Flink provides many built-in sinks, such as writeASText, print, HDFS, Kaka, etc. The following will implement a custom Sink based on MySQL, which can be compared with the custom MysqlSource, as follows:

/ * * *@Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/4/16
 *  @Time: so * * /
public class MysqlSink extends RichSinkFunction<UserBehavior> {
    PreparedStatement pps;
    public Connection conn;
    private String driver;
    private String url;
    private String user;
    private String pass;
    /** * Initializes the connection in the open() method@param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize database connection parameters
        Properties properties = new Properties();
        URL fileUrl = TestProperties.class.getClassLoader().getResource("mysql.ini");
        FileInputStream inputStream = new FileInputStream(new File(fileUrl.toURI()));
        properties.load(inputStream);
        inputStream.close();
        driver = properties.getProperty("driver");
        url = properties.getProperty("url");
        user = properties.getProperty("user");
        pass = properties.getProperty("pass");
        // Get the data connection
        conn = getConnection();
        String insertSql = "insert into user_behavior values(? ,? ,? ,? ,? ,? ,? ,?) ;";
        pps = conn.prepareStatement(insertSql);
    }

    /** * close the connection */
    @Override
    public void close(a) {

        if(conn ! =null) {
            try {
                conn.close();
            } catch(SQLException e) { e.printStackTrace(); }}if(pps ! =null) {
            try {
                pps.close();
            } catch(SQLException e) { e.printStackTrace(); }}}/** * Call the invoke() method to insert data **@param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(UserBehavior value, Context context) throws Exception {
        pps.setLong(1, value.userId);
        pps.setLong(2, value.itemId);
        pps.setInt(3, value.catId);
        pps.setInt(4, value.merchantId);
        pps.setInt(5, value.brandId);
        pps.setString(6, value.action);
        pps.setString(7, value.gender);
        pps.setLong(8, value.timestamp);
        pps.executeUpdate();
    }
    /** * get database connection **@return
     * @throws SQLException
     */
    public Connection getConnection(a) throws IOException {
        Connection connnection = null;

        try {
            // Load the driver
            Class.forName(driver);
            // Get the connection
            connnection = DriverManager.getConnection(
                    url,
                    user,
                    pass);
        } catch (Exception e) {
            e.printStackTrace();
        }
        returnconnnection; }}Copy the code

About RichFunction

Careful readers can notice that in the previous operator operation cases, RichFunction is used. In many cases, some initialization operations or obtaining the context information of the function are required before the function processes data. DataStream API provides a class of RichFunction. This function provides a lot of additional functionality over normal functions.

When using RichFunction, you can implement two additional methods:

  • Open (), which is the initialization method, is called once before each character first calls the transformation method (such as map). Note that the Configuration parameter is only used in the DataSet API, but not in the DataStream API. Therefore, it can be ignored when the DataStream API is used.
  • Close (), the function’s termination method, is called once for each task after the last call to the transformation method, usually for operations such as resource release.

The getRuntimeContext() method also allows users to access function context information (RuntimeContext), such as the parallelism of the function, the number of the function’s subtask, and the name of the task executing the function, as well as the partition status.

conclusion

This paper first realizes the custom MySQL Source, then carries out a series of operator operations based on the MySQL Source, and makes a detailed analysis of common operator operations, finally realizes a custom MySQL Sink, and explains the RichFunction.

Code address :github.com/jiamx/study…

The public account “Big Data Technology and Data Warehouse” replies to “information” to receive the big data data package