Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

About the CoProcessFunction Combat Trilogy series

  • “CoProcessFunction Combat trilogy” aims to learn and master the use of Flink low-order processing function CoProcessFunction through three times of combat.
  • The series begins with an introduction to CoProcessFunction, then quickly moves into action to understand the basic functions of CoProcessFunction.
  • The next article will combine states so that the processing of dual-flow elements remains relative to each other;
  • In the actual combat of the final chapter, timer function will be added to ensure that the data of the same key can be processed in time in dual-stream scenarios.

Version information

  1. Development environment operating system: MacBook Pro 13 inch, macOS Catalina 10.15.3
  2. Development tool: IDEA ULTIMATE 2018.3
  3. The JDK: 1.8.0 comes with _211
  4. Maven: 3.6.0
  5. Flink: 1.9.2

Links to articles

  1. The basic function
  2. State handling
  3. Timer and side output

About CoProcessFunction

  • The purpose of CoProcessFunction is to process data from two sources simultaneously;
  • For example, the operation in the following figure monitors port 9998 and 9999 at the same time, processes the received outputs separately, and then processes them by the same sink (printing) :

  • The way Flink supports it is to extend CoProcessFunction to handle it. To get a better idea, let’s look at the KeyedProcessFunction and CoProcessFunction class diagrams together, as shown below:

  • CoProcessFunction has the same inheritance as KeyedProcessFunction, and CoProcessFunction itself is very simple. Processing the two upstream incoming data in processElement1 and processElement2 is enough, and timer Settings are also supported;

This actual combat function brief introduction

The application we are going to develop is very simple and described as follows:

  1. Create two data sources, data respectively from local port 9998 and port 9999;
  2. Each port receives data like aaa,123, which is converted to Tuple2 instance, f0 is AAA, f1 is 123;
  3. In the CoProcessFunction implementation class, the data for each data source is logged, and all is passed to the downstream operator;
  4. The downstream operation is printing, so all data received on ports 9998 and 9999 is printed on the console;
  5. The functions of demo are as follows:

  • Next, start coding;

Download the source code

If you don’t want to write code, the entire series of source codes can be downloaded at GitHub, with the following address and link information (github.com/zq2599/blog…

The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol

The Git project has multiple folders. This chapter will be used in the FlinkStudy folder, as shown in the red box below:

Introduction of code

  1. Develop a Map operator to convert string to Tuple2;
  2. Redevelopment abstract class AbstractCoProcessFunctionExecutor, features include: processing data, listen on port, the flink invoked operator, shuangliu connection, will be treated as the shuangliu results, print it out;
  3. From the above description can be seen, AbstractCoProcessFunctionExecutor do a lot of things, but not the concrete business logic of the shuangliu connection, these are not to do is to subclass implementation, All the focus of the three-part series focused on a subclass of AbstractCoProcessFunctionExecutor, connect the shuangliu after business logic, as shown in the figure below, the red CoProcessFunction business code, the other is done in an abstract class:

The Map of operator

  1. Create a map operator that converts the string aaa,123 into an instance of Tuple2 where f0 is AAA and f1 is 123;
  2. The operator is wordcountmap.java:
package com.bolingcavalry.coprocessfunction;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;

public class WordCountMap implements MapFunction<String.Tuple2<String.Integer>> {
    @Override
    public Tuple2<String, Integer> map(String s) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return null;
        }

        String[] array = s.split(",");

        if(null==array || array.length<2) {
            System.out.println("invalid line for array");
            return null;
        }

        return new Tuple2<>(array[0], Integer.valueOf(array[1])); }}Copy the code

An abstract class

  • The abstract class AbstractCoProcessFunctionExecutor. Java source code is as follows, later will illustrate a few key points:
package com.bolingcavalry.coprocessfunction;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;

/ * * *@author will
 * @email [email protected]
 * @dateThe 2020-11-09 opening *@descriptionAn execution class that strings the entire logic to experience CoProcessFunction */
public abstract class AbstractCoProcessFunctionExecutor {

    /** * returns an instance of CoProcessFunction, which is left to subclass implementation *@return* /
    protected abstract CoProcessFunction<
            Tuple2<String, Integer>,
            Tuple2<String, Integer>,
            Tuple2<String, Integer>> getCoProcessFunctionInstance();

    /** * Listen on the specified port, * get the data through the map to the Tuple2 instance, * add the timestamp to the element, * then press f0 partition, * return the partitioned KeyedStream *@param port
     * @return* /
    protected KeyedStream<Tuple2<String, Integer>, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {
        return env
                // Listen on the port
                .socketTextStream("localhost", port)
                // The resulting string "aaa,3" is converted into an instance of Tuple2, f0="aaa", f1=3
                .map(new WordCountMap())
                // Partition words as keys
                .keyBy(0);
    }

    /** * Override this method if a subclass has side output that needs to be processed, it will be called */ after the main process has finished executing
    protected void doSideOutput(SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream) {}/** * How to execute business *@throws Exception
     */
    public void execute(a) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parallelism 1
        env.setParallelism(1);

        // Listen on port 9998
        KeyedStream<Tuple2<String, Integer>, Tuple> stream1 = buildStreamFromSocket(env, 9998);

        // Listen on port 9999
        KeyedStream<Tuple2<String, Integer>, Tuple> stream2 = buildStreamFromSocket(env, 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = stream1
                // Two streams are connected
                .connect(stream2)
                // Execute low-level processing functions. The processing logic is implemented in subclasses
                .process(getCoProcessFunctionInstance());

        // Print all the elements output by the lower-order processing function
        mainDataStream.print();

        // Side output related logic, subclasses have side output needs to override this method
        doSideOutput(mainDataStream);

        / / execution
        env.execute("ProcessFunction demo : CoProcessFunction"); }}Copy the code
  • One of the key points: there are two data sources, and the processing logic for each source is wrapped in the buildStreamFromSocket method;
  • Stream1.connect (stream2) connects two streams;
  • Key point 3: Process receives the CoProcessFunction instance, where the processing logic for the merged stream resides;
  • The key points of the four: getCoProcessFunctionInstance is abstract method, returned CoProcessFunction instance, to subclass implementation, so what is completely determined by the subclass CoProcessFunction;
  • Key # 5: the doSideOutput method does nothing, but is called at the end of the main flow code. If a subclass needs SideOutput, override this method. The input parameter of this method is the processed data set, from which SideOutput can be obtained.

Subclass that operates on connected double streams

  1. This subclass collecteveryone. Java looks like this, the logic is very simple, each source upstream data directly output to the downstream operator:
package com.bolingcavalry.coprocessfunction;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CollectEveryOne extends AbstractCoProcessFunctionExecutor {

    private static final Logger logger = LoggerFactory.getLogger(CollectEveryOne.class);

    @Override
    protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() {
        return new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            @Override
            public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                logger.info("Handle elements of stream 1: {},", value);
                out.collect(value);
            }

            @Override
            public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) {
                logger.info("Handle elements of stream 2: {}", value); out.collect(value); }}; }public static void main(String[] args) throws Exception {
        newCollectEveryOne().execute(); }}Copy the code
  1. In the code above, the generic definition after CoProcessFunction is long:

    , Tuple2

    , Tuple2

    >, a total of three Tuple2, representing the input type of data source no. 1, input type of data source No. 2, and downstream output type.
    ,>
    ,>
  2. Code finished, run up to try;

validation

  1. Open ports 9998 and 9999 of the machine respectively. I have a MacBook here, and execute NC-L 9998 and NC-L 9999
  2. If you are on a Mac like me, run the Collecteveryone. main method (if you are on a Windows computer, I have not tried this, but you can make it jar and deploy it online).
  3. Type AAA,111 and BBB,222 on the console listening to ports 9998 and 9999, respectively
  4. Here is the output from the Flink console. You can see that the logging code for the processElement1 and processElement2 methods has been executed, and that the print method, as the most downstream, prints out the data from both sources, as expected:
12:45:38,774 INFO CollectEveryOne - Handle elements of stream 1: (AAA,111), (AAA,111) 12:45:43,816 INFO CollectEveryOne - Handle elements of stream 2: (bbb,222) (bbb,222)Copy the code
  • At this point, our first dual-stream low-level functions are complete, and we have a basic understanding of CoProcessFunction, which, of course, is far less useful than that. In the next article, let’s use states to separate processElement1 and processElement2 from each other’s processed states, so that each element’s processing is associated with the other stream instead of being isolated.

You are not alone, Xinchen original accompany all the way

  1. Java series
  2. Spring series
  3. The Docker series
  4. Kubernetes series
  5. Database + middleware series
  6. The conversation series

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…