Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

Flink processing function practice series links

  1. Learn more about the state operation of ProcessFunction (Flink-1.10);
  2. ProcessFunction;
  3. KeyedProcessFunction classes;
  4. ProcessAllWindowFunction(window processing);
  5. CoProcessFunction(dual-stream processing);

About Process Functions

As shown in the figure below, IN conventional business development, SQL, Table API and DataStream API are commonly used, while low-level Porcession is relatively seldom used. From this chapter, we get familiar with Process Function through actual practice. What can we do with this series of low-level operators?

About ProcessFunction class

The ProcessFunction class has the RichFunction properties open, close, and processElement and onTimer methods:The common features are as follows:

  1. Processing individual elements;
  2. Access timestamp;
  3. Bypass output;

Next, write two apps to experience these features;

Version information

  1. Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.3
  2. Development tool: IDEA ULTIMATE 2018.3
  3. The JDK: 1.8.0 comes with _211
  4. Maven: 3.6.0
  5. Flink: 1.9.2

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

The Git project has multiple folders. This chapter will be used in the FlinkStudy folder, as shown in the red box below:

Create a project

Run the following command to create a flink-1.9.2 application project:

mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - DarchetypeVersion = 1.9.2Copy the code

Enter groupId: com.bolingcavalry, architectid: Flinkdemo as prompted

The first demo

The first demo was used to experience the following two features:

  1. Processing individual elements;
  2. Access timestamp;

Create Simple. Java as follows:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

public class Simple {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Parallelism is 1
        env.setParallelism(1);

        // Set the data source
        DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                for(int i=1; i<4; i++) {

                    String name = "name" + i;
                    Integer value = i;
                    long timeStamp = System.currentTimeMillis();

                    // The data and timestamp will be printed to validate the data
                    System.out.println(String.format("Source, %s, %d, %d\n",
                            name,
                            value,
                            timeStamp));

                    // Emit an element with a timestamp attached
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

                    // In order to make the timestamp of each element different, each shot is delayed by 10 ms
                    Thread.sleep(10); }}@Override
            public void cancel(a) {}});// Filter elements with odd values
        SingleOutputStreamOperator<String> mainDataStream = dataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        // elements whose f1 field is odd do not enter the next operator
                        if(0 == value.f1 % 2) {
                            out.collect(String.format("ProcessElement, %s, %d, %d\n", value.f0, value.f1, ctx.timestamp())); }}});// Print the result to prove that the timestamp of each element is indeed available in ProcessFunction
        mainDataStream.print();

        env.execute("processfunction demo : simple"); }}Copy the code

Here’s an introduction to the above code:

  1. Create a data source that emits one element every 10 milliseconds. There are three elements of type Tuple2. F0 is a string, f1 is an integer, and each element has a timestamp.
  2. When the data source sends out elements, print out the f0, F1 and time stamp of elements in advance to check whether they are consistent with the following data.
  3. In the next step, you create an anonymous subclass of ProcessFunction that can process each element sent upstream, get a timestamp for each element (an important ability), and filter out elements whose F1 field is odd.
  4. Finally, print out the data processed by ProcessFunction to verify whether the processing results are in line with expectations.

The Simple class is executed directly, and the result is as follows:

The second demo

The second demo is to realize Side Outputs. For a DataStream, data can be output to other operators through Side Outputs without affecting the processing of the original operators. The following shows Side Outputs:

Create SideOutput class:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;

public class SideOutput {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Parallelism is 1
        env.setParallelism(1);

        / / define OutputTag
        final OutputTag<String> outputTag = new OutputTag<String>("side-output") {};// Create a List with two Tuple2 elements
        List<Tuple2<String, Integer>> list = new ArrayList<>();
        list.add(new Tuple2("aaa".1));
        list.add(new Tuple2("bbb".2));
        list.add(new Tuple2("ccc".3));

        // Create DataStream using List
        DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

        // All elements go to mainDataStream, and elements whose f1 field is odd go to SideOutput
        SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream
                .process(new ProcessFunction<Tuple2<String, Integer>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

                        // Go to the next operator of the main process
                        out.collect("main, name : " + value.f0 + ", value : " + value.f1);

                        // Elements with odd f1 fields enter SideOutput
                        if(1 == value.f1 % 2) {
                            ctx.output(outputTag, "side, name : " + value.f0 + ", value : "+ value.f1); }}});// Disable chanin so that the original DAG can be seen clearly on the page
        mainDataStream.disableChaining();

        // Get bypass data
        DataStream<String> sideDataStream = mainDataStream.getSideOutput(outputTag);

        mainDataStream.print();
        sideDataStream.print();

        env.execute("processfunction demo : sideoutput"); }}Copy the code

Here’s an introduction to the above code:

  1. The data source is a collection of type Tuple2. The f0 field is a string and the F1 field is an integer.
  2. The anonymous subclass of ProcessFunction concatenates f0 and F1 of each element into a string, which is sent to the main process operator, and sends elements with odd f1 fields to the by-pass output;
  3. When the data source sends out elements, print out the f0, F1 and time stamp of elements in advance to check whether they are consistent with the following data.
  4. Print out the elements of the main process and bypass output to verify whether the processing results are in line with expectations;

Execute SideOutput to see the result, as shown in the figure below, main prefix is the main process operator, there are three records in total, side prefix is the bypass output, only two records with f1 field is odd, which meets the expectation:The above operations are performed on IDEA, flink can also be deployed separately, and then the above project is constructed into jar and submitted to The JobManager of Flink. The DAG is as follows:

This completes the ProcessFunction class, the simplest of the handlers, and we’ll try more types of handlers in the next article.

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…