Hi, I’m a veteran.

Flink is based on a stream programming model and has many powerful built-in operators that can help us develop applications quickly.

As an experienced Flink developer, most of the operator writing method and scene seems to be well understood, but there are often some small problems in the process of using:

  1. Part of the operator has not been used for a long time, forget usage.
  2. What operators are selected for certain scenarios? How to choose? Vagueness.

To do a good job, you must sharpen your tools. Fast and efficient use of appropriate operators to develop programs, often can achieve twice the result with half the effort.

Thinking that a good memory is better than bad writing this truth, hereby organize a common Flink operator development manual!! Also serve as your own work notes. Welcome everyone to collect ~

1 DataStream API

The Flink DataStream API allows users to write Flink streaming programs flexibly and efficiently. It consists of DataSource module, Transformation module and DataSink module.

  • Source module definitionData accessFeatures, including built-in and external data sources.
  • The Transformation module defines DataStream data flowsconversionOperation.
  • Sink module defines dataThe outputFunction to store results to external storage media.

The execution environment: StreamExecutionEnvironment

System modules: DataSouce, Transformation, and DataSink

2 the DataSource input

DataSource input module defines data input operations in DataStream API. Flink input data sources are divided into built-in data sources and third-party data sources.

  • Built-in data sources includefile,Socket Network portAs well asA collection ofType data, without the need to introduce other dependent libraries, is already implemented within the Flink system.
  • Third-party data sources define Flink and external system data interaction logic, for exampleApache Kafka Connector,Elastic Search ConnectorAnd so on.
  • At the same time, users can customize data sources.

2.1 readTextFile and readFile operator

Supports reading text files into Flink system and converting them into DataStream data sets.

  • ReadTextFile operatorSystem directly read text files (. The log |. TXT…).
  • ReadFile operatorYou can specifyInputFormatRead files of specific data types (CSV, JSON, or custom InputFormat)
// Read the text file
val textInputStream = env.readTextFile(
   "/data/example.log")
   
// Specify InputFormat to read CSV files
val csvInputStream = env.readFile(
   // You can customize the type (InputFormat)
   new CsvInputFormat[String] (   
      new Path("/data/example.csv")) {override def fillRecord(out: String,
       onbjects: Array[AnyRef] :String) = {
         return null}},"/data/example.csv"
)
Copy the code

2.2 the Socket operator

Data can be read from Socket ports and converted into DataStream operators.

  • Operator parameters: Ip address, port, DELIMiter String cutter, maximum retry times maxRetry
  • maxRetryProvides a task failure reconnection mechanism. When set to 0, the Flink task stops.
  • In Unix, run thenc -lk [:port]Starting the Network Service
// Flink reads Socket port (9999) data
val socketDataStream = 
   env.socketTextStream("localhost".9999)
Copy the code

2.3 Set Operator

Supports manipulation of Flink built-in Collection classes (collections) into DataStream.

  • supportJava,ScalaOperator common set class
  • The essence is to distribute local collection data to remote execution; Apply toLocal testNote the consistency of data structure types
// fromElements element collection conversion
val elementDataStream = 
 env.fromElements(
   Tuple2('aa', 1L),Tuple2('bb', 2L)
 )
 
 // fromCollection array conversion (Java)
 String[] collections = new String[] {
   "aa"."bb"
 };
 DataStream<String> collectionDatastream =
  env.fromCollection(
   Arrays.asList(collections)
  );
  
  // List List conversion (Java)
  List<String> arrays = new ArrayList< > (); arrays.add("aa")
  arrays.add("bb")
  DataStream<String> arrayDataStream = 
   env.fromCollection(arrays)
Copy the code

2.4 External data source operator

Data can be read from third-party data sources and converted into DataStreams operators.

  • Common external data source operators include Hadoop FileSystem, ElasticSearch, Apache Kafka, and RabbitMQ
  • Jar dependencies (POM) need to be added to the Maven environment
/ / Maven configurations
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka1.2_212.</artifactId>
  <version>1.91.</version>
</dependency>

// Read Kafka data source (Java)
Properties prop = new Properties(a); prop.setProperty("bootstrap.servers"."localhost:9092"); .DataStream<String> kafkaStream = 
  env.addSource(
    new FlinkKafkaStream010< > ("topic-1".new SimpleStringSchema(),
      properties
    )
  )
Copy the code

2.5 Customizing data source operators

Support implementation of built-in function-related interfaces, custom data sources.

Specific built-in methods include but are not limited to:

  • SourceFunction interface
  • ParallelSourceFunction interface
  • RichParallelSourceFunction class

This is then added via env’s addSource() method, and the implementation is not expanded.

3 DataStream conversion

Flink generates a new DataStream for several DataStream operations, a process known as Transformation.

Most of the logic in Flink is done during Transformation, including Transformation, filtering, sorting, joining, association, selection, and aggregation.

Notice the difference between Transformation and Spark.

DataStream conversions in Flink can be divided into several types:

  • Single DataStream: Processing logic of a single DataStream dataset element
  • Multi DataStream: Processing logic for multiple DataStream data set elements
  • Physical partitions: Data set parallelism and data partition processing

3.1 Map Operator (#Single)

Transform each element of the dataset to generate a new DataStream.

  • The bottom layer is MapFunction operator. Operations are performed on each element by calling the map function.
  • It is often used for data cleaning, calculation, and conversion.
val inputStream = env.fromElements(
  ("aa".1), ("bb".2), ("cc".3))// The map operation completes each element with + 1
val mapStream1 = inputStream.map(
  t => (t._1, t2. + 1))// The second way is to specify MapFunction
val mapStream2 = inputStream.map(
 new MapFunction[(String.Int), (String.Int)] {
   override def map(t: (String.Int)) : (String.Int) = {
       (t._1, t._2 + 1)}})Copy the code

3.2 FlatMap Operator (#Single)

All elements in a dataset can be converted to multiple elements to generate a new DataStream.

val flatDataStream = env.fromCollections()
val resultStream = flatDataStream.flatMap{
   line => line.split(",")}Copy the code

3.3 Filter operator (#Single)

Supports filtering and filtering of data sets to generate DataStream

// Wildcard
val filterDataStream = dataStream.fliter {
 _ % 2= =0
}

// Specify operator expressions
val filterDS = dataStream.filter(
    x => x % 2= =0
)
Copy the code

3.4 keyBy operator (#Single)

Partitions DataStream data sets based on the specified Key to generate a new KeyedStream

  • Data with the same Key value is merged into the same partition
  • Similar to groupByKey in Spark
val inputStream = env.fromElements(
 ("aa".11), ("aa".22), ("bb".33))// Use the first field as the key partition
// Convert KeyedStream[(String, String), Tuple]
val keyedStream: inputStream.keyBy(0)
Copy the code

3.5 Reduce operator (#Single)

Supports the aggregation of KeyedSteam based on reduce() to generate DataStream

  • KeyedStream is aggregated according to the key partition
  • Support for operators and custom reduceFunc functions
val inputStream = env.fromElements(
 ("aa".11), ("bb".33), ("cc".22), ("aa".21))// Specify the first field partition key
val keyedStream = inputStream.keyBy(0)

// Add and sum the second field
val reduceDataStream = keyedStream.reduce {
  (t1, t2) => (t1._1, t1._2 + t2._2)
}
Copy the code

To customize Reduce functions, implement anonymous classes.

val reduceDataStream = keyedStream.reeduce(
  new ReduceFunction[(String.Int)] {
     override def reduce(t1: (String.Int),
        t2: (String.Int)) : (String.Int) = {
           (t1._1, t1._2 + t2._2)
        }
  }
)
Copy the code

3.6 Aggregations operator (#Single)

DataStream basic aggregation operator. You can run KeyedStream to generate DataStream

  • You can customize the aggregation logic based on the specified field aggregation
  • The bottom layer encapsulates sum, min, Max and other functions
val inputStream = env.fromElements(
  (1.7), (2.8), (3.11), (2.3))// Specify the first field partition key
val keyedStream:
[(Int.Int), Tuple] = inputStream.keyBy(0)
// The second field is sum
val sumStream = keyedStream.sum(1)
// Finally output the result
sumStream.print()
Copy the code

3.7 Connect Merge operator (#Multi)

A ConnectedStream is generated by merging datasets of multiple types and preserving the data types of the original datasets

  • SharedState data, which can obtain data set state from each other
  • In some scenarios, join operator can be replaced to realize FLink in a disguised formShuangliu joinfunction
// Create datasets of different data types
val stream1 = env.fromElements(
  ("aa".3), ("bb".4), ("cc".11), ("dd".22))val stream2 = env.fromElements(
 (1.2.11.8))// Connect data sets
Return [(String, Int), Int]
// similar: [("aa", 3),1]
val connectedStream = stream1.connect(stream2)
Copy the code

3.8 Connect operator – CoMap (#Multi)

ConnectedStream The Map operator of a data stream that merges all elements of a data set

  • Define a CoMapFunction object with input data type, output data type, and mapFunc
  • The sub-map function is executed alternately by multiple threads to generate the final merged target data set
// Data flow generated after Connected operation above
// The first argument is of type stream1; The second is stream2 type; The third is of the Stream3 type
val resultStream = connnectedStream.map(
  new CoMapFunction[(String.Int), Int, (Int.String)] {
    // Define the first dataset processing logic with the input value stream1
    override def map1(in1: (String.Int)) : (Int.String) = {
       (in1._2, in1._1)
    }
    // Define the second dataset processing logic with the input value stream2
    override def map2(in2: Int) : (Int.String)={
       (in2, "default")})Copy the code

3.9 Connect Operator – CoFlatMap (#Multi)

ConnectedStream FlatMap function operator for data flows

Specify the CoFlatMapFunction in the flatmap() method and implement the flatmap1() and flatmap2() functions, respectively.

val resultStream2 = connectedStream.flatMap(
  new CoFlatMapFunction[(String.Int), Int, (String.Int.Int)] {
     // For example, a function that shares variables completes the union of two sets of data
     var value = 0
     // define the first dataset handler
     override def flatMap1(in1: (String.Int),
      collect: Collector[(String.Int.Int)]) :Unit = {
        collect.collect((in1._1, in1._2, value))
      }
  }
  
  // define the second dataset handler
  override def flatMap2(in2: Int, collect: Collector[(String.Int.Int)]) :Unit = {
    value = in2
  }
)
Copy the code

3.10 Union Operator (#Multi)

Combine two or more data sets to generate DataStream of the same type as the input data set

  • The data types of the input data sets must be consistent
  • The data type of the output data set is the same as the input data
  • Note the difference between the connect operator and the connect operator
val stream1 = env.fromElements(
  ("aa".3), ("bb".22), ("cc".45))val stream2 = env.fromElements(
 ("dd".23), ("ff".21), ("gg".89))val stream3 = ....
// Merge data sets
val unionStream = stream1.union(stream2)
val unionStream2 = stream1.union(
  stream2, stream3
)
Copy the code

3.11 Split operator (#Multi)

The DataStream operator is used to split DataStream data sets according to conditions and convert them into two DataStream operators

  • Routes incoming data to multiple output data sets insplitFunction to define split logic
  • Can be seen as a reverse implementation of Union
val stream1 = env.fromElements(
 ("aa".3), ("bb".33), 
 ("cc".56), ("aa".23), ("cc".67))// Mark the data according to the parity of the second field (split)
val splitStream = stream1.split(
 v => if (v._2 % 2= =1 Seq("even") 
          else Seq("odd")))Copy the code

3.12 Select operator (#Multi)

Select filter operator to Select data set elements by criteria to generate a new DataStream

// Filter even data
val evenStream = splitedStream.select("even")

// Filter all data
val allStream = splitedStream.select("even"."odd")
Copy the code

3.13 Window Operator (Time mechanism)

The window operator of Flink is the core operator of real-time calculation and is often used for index statistics in a fixed time

1) Window API

Flink provides a high-level window API operator to encapsulate the low-level window operations, including window types, triggers, side outputs, etc. Meanwhile, according to the upstream input Stream, the Stream can be divided into non-keyed and Keyed types.

  • Non-keyed (upstream is non-keyedstream)

Call windowAll() directly to get global statistics

val inputStream: DataStream=...// Call window() when the incoming stream is KeyedStream
inputStream.keyBy(0).window(new WindowFunc(...). )// When a non-keyed input Stream is passed in, no processing is done
// Use windowAll() directly for global statistics
inputStream.windowAll(new WindowFunc(...). )Copy the code
  • Keyed (upstream KeyedStream type)

Call DataStream’s built-in window()

stream.keyBy(..//keyed input stream.
   .window(..// Window type.
   .trigger(.// trigger < optional >..)
   .evictor(.// Culler < optional >.)
   .allowdedLateness(.// Delay processing mechanism.
   .sideOutputLateDate(.// Side output.
   .reduce/fold.aggregate/apply(.// Compute the function.
Copy the code

2) Window type

According to the window allocation mode: scroll, slide, session and global, respectively support different window flow mode and scope.

Both event time and process time data streams are supported.

  • Tumbling Window Join

  • Sliding Window Join

  • Session Widnow Join

Statistical case description in 10-minute sliding window:

val tumblingStream = inputStream
  .keyBy(0)
  .window(
    TumblingEventTimeWindows.of(
    Time.seconds(10))
  ).process(...)
Copy the code

4 DataSink output

Flink reads data sources, and after a series of Transform operations, the results are generally transferred to external storage media or downstream, namely Flink’s DataSink process.

Flink encapsulates the connection logic of external storage in the Connector Connector. Common examples are:

  • Apache Kafka
  • ElasticSearch
  • Hadoop FileSystem
  • Redis
  • File system and port

4.1 file | port

Support file, client, Socket network output, Flink built-in operator, do not need to rely on the third party library

Common writeAsCSV (local file) and writeToSocket (Socket network)

 / / local CSV
inputStream.writeAsCsv(
 "file://path/xx.csv".WriteMode.OVERWRITE
)

/ / Socket network
inputStream.writeToSocket(
 host, post, new SimpleStringSchema())Copy the code

4.2 External Third Parties

Based on SinkFunction definition, it is necessary to introduce external tripartite dependency library and set tripartite system parameters

val dataStream = ...

/ / define FlinkKafkaProducer
val kafkaProducer = new FlinkKafkaProducer011[Sting] (
  "localhost:9092".// Kafka Broker list connection
  "xxx-topic".// kafka topic
  new SimpleStringSchema(a)/ / the serialization
)

/ / add SinkFunc
dataStream.addSink(kafkaProducer())
Copy the code

5 concludes

The built-in operator library of Flink is full of types and powerful functions. It is a necessary skill for real-time computing to master the usage mode and scene application of operators.

I will continue to update this series later. Welcome to add my personal wechat account: Youlong525 to learn and communicate with you

To be continued.

More good articles, welcome to pay attention to the public number: big data Arsenal