@[TOC]

; ; ;

Introduction to the layered API

Flink provides three different APIs, layered by level of abstraction. Each API has a different emphasis on simplicity and expressiveness, and is tailored to a different application scenario.

  • ProcessFunction is the lowest level interface provided by Flink. ProcessFunction can handle a single event in one or two input data streams or multiple events grouped within a particular window. It provides fine-grained control over timing and state. Developers can modify the state at will, and they can register timers to trigger callbacks at some point in the future. As a result, you can use processFunction to implement the complex single-event based business logic required by many stateful event-driven applications.
  • The DataStream API provides processing primitives for many common stream processing operations. These operations include window, record by record conversion operations, external database queries while processing events, and so on. The DataStream API supports the Java and Scala languages and has pre-defined functions such as map(), reduce(), aggregate(), and so on. You can use extensions to implement predefined interfaces or use Java, Scala lambda expressions to implement custom functions.
  • SQL & Table API: Flink supports two relational APIs, Table API and SQL. Both of these APIs are batch – and stream-unified APIs, which means that the relational API executes queries with the same semantics and produces the same results on the borderless real-time data stream and the borderless historical data stream. The Table API and SQL use Apache CalCite to parse, validate, and optimize queries. They integrate seamlessly with the DataStream and DataSet APIs and support user-defined scalar functions, aggregate functions, and table-valued functions.
  • Extension libraries

    • Complex event processing (CEP) : Pattern detection is a very common use case in event flow processing. Flink’s CEP library provides APIs that enable users to specify event patterns in terms such as regular expressions or state machines. The CEP library integrates with Flink’s DataStream API to evaluate schemas on DataStream. Applications of the CEP library include network intrusion detection, business process monitoring, and fraud detection.
    • DataSet API: The DataSet API is Flink’s core API for batch applications. The base operators provided by the DataSet API include Map, Reduce, (OUTER) Join, Co-Group, Iterate, etc. All operators are supported by corresponding algorithms and data structures to operate on serialized data in memory. If the data size exceeds the reserved memory, the excess data will be stored to disk. The data processing algorithm of Flink’s DataSet API borrowed from the implementation of traditional database algorithms, such as hybrid hash-join and external merge-sort.
    • Gelly: Gelly is an extensible graphics processing and analysis library. Gelly is implemented on top of and integrated with the DataSet API. As a result, it can benefit from its extensible and robust operators. Gelly provides built-in algorithms such as Label Propagation, Triangle Enumeration, and Page Rank, as well as a Graph API that simplifies the implementation of custom Graph algorithms.

The programming model of DataStream

The programming model of DataStream includes four parts: Environment, DataSource, Transformation and Sink.

Flink’s DataSource DataSource

2.1 File-based, in this case HDFS

package com.chb.flink.source import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FileSource { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment Streamenv.setParallelism (1) // import the implicit conversion, recommend it here, Can prevent the IDEA of error code hinting problem import org. Apache. Flink. Streaming. API. Scala. _ / val/read data stream = StreamEnv. ReadTextFile (" HDFS: / / 10.0.0.201:9000 / README. TXT ") / / val transformation calculation result: DataStream[(String, Int)] = stream.flatMap(_.split(",")) .map((_, 1).keyBy(0).sum(1) // Print results to console result.print() // Start streaming, without this code the program above will not run streamenv.execute (" wordCount ")}}

2.2 Collection-based sources

Kind of like Spark’s serialization

package com.chb.flink.source import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object CollectionSource { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment Streamenv.setParallelism (1) // import the implicit conversion, recommend it here, Can prevent the IDEA of error code hinting problem import org. Apache. Flink. Streaming. API. Scala. _ / var/read data dataStream. = streamEnv fromCollection (Array ( new StationLog("001", "186", "189", "busy", 1577071519462L, 0), new StationLog("002", "186", "188", "busy", 1577071520462L, 0), new StationLog("003", "183", "188", "busy", 1577071521462L, 0), new StationLog("004", "186", "188", "success", 1577071522462L, 32))) datastream. print() streamenv.execute ()}} /* * SST log data * @Param sid SST ID * @Param callOut caller number * @Param callIn * @Param CallType eg: Fail, Busy, Barring, Success * @Param CallTime */ class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

2.3, Kafka

The first need to be equipped with the Kafka connector, in addition to more connectors can be used to check the official network

2.3.1. Introducing Dependencies

<! -- Kafka connector--> <dependency> <groupId>org.apache.flink</groupId> </ Artifactid > <version>1.10.1</version> < Exclusion > < Exclusion > <! -- excluding references to Jackson; --> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> < the dependency > < groupId > org. Apache. Kafka < / groupId > < artifactId > kafka - clients < / artifactId > < version > against 2.4.1 < / version > </dependency>

2.3.2 Kafka The first Source

package com.chb.flink.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer object KafkaSourceByString { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment StreamEnv. SetParallelism (1) / / import implicit conversion import org. Apache. Flink. Streaming. API. Scala. _ / configuration/kafka val props = new Properties() props.setProperty("bootstrap.servers", "ShServer:9092") props.setProperty("group.id", "chb01") props.setProperty("key.deserializer", classOf[StringDeserializer].getName) props.setProperty("value.deserializer", classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset", Var flinkKafkaConSumer = new flinkKafkaConSumer [String]("test", new SimpleStringSchema(), props) val stream = streamEnv.addSource(flinkKafkaConSumer) stream.print() streamEnv.execute() } }

2.3.3 Kafka second Source

package com.chb.flink.source import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer object KafkaSourceByKeyValue { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment StreamEnv. SetParallelism (1) / / import implicit conversion import org. Apache. Flink. Streaming. API. The scala. _ val props = new Properties () props.setProperty("bootstrap.servers", "ShServer:9092") props.setProperty("group.id", "fink02") props.setProperty("key.deserializer", classOf[StringDeserializer].getName) props.setProperty("value.deserializer", classOf[StringDeserializer].getName) props.setProperty("auto.offset.reset", AddSource (new FlinkKafkaConsumer[(String, String)]("test")) // set kafka as datasource val stream = streamEnv. New KafkaDeserializationSchema [(String, String)] {/ / flow whether to end the override def isEndOfStream (t: (String, String)) = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) = { if (consumerRecord ! = null) { var key = "null" var value = "null" if (consumerRecord.key() ! = null) key = new String(consumerRecord.key(), "UTF-8") if (consumerRecord.value() ! = null) value = new String(ConsumerRecord.value (), "UTF-8") (key, value)} else {// If Kafka is empty, return a fixed binary group ("null", "Null")}} / / set the return type for binary group override def getProducedType = createTuple2TypeInformation (createTypeInformation [String], createTypeInformation[ String]) } , props).setStartFromEarliest()) stream.print() streamEnv.execute() } }

2.3.3.1 Kafka production test

package com.chb.flink.source import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import scala.util.Random object MyKafkaProducer { def main(args: Array[String]): Unit = { val props = new Properties() props.setProperty("bootstrap.servers", "ShServer:9092") // Serializers props. SetProperty (" Key.Serializer ") classOf[StringSerializer].getName) props.setProperty("value.serializer", classOf[StringSerializer].getName) val producer = new KafkaProducer[String, String](props) val random = new Random() while(true) { producer.send(new ProducerRecord[String, String]("test", "key" + random.nextInt(), "value" + random.nextInt())) Thread.sleep(1000) } } }

2.4 Custom Source

There are two ways to implement a custom data Source:  by implementing the SourceFunction interface to define a Source that has no parallelism (that is, a parallelism of only 1).  by implementing ParallelSourceFunction interface or inherit RichParallelSourceFunction from definition of parallelism data source.

2.4.1. Implement custom Source of sourceFunction

package com.chb.flink.source import org.apache.flink.streaming.api.functions.source.SourceFunction import Org. Apache. Flink, streaming API. Scala. StreamExecutionEnvironment import scala. Util. Random / * * * can also be a custom data source, of course, there are two ways: *  comes from a Source that defines no parallelism (that is, parallelism can only be 1) by implementing the SourceFunction interface. *  by implementing ParallelSourceFunction interface or inherit RichParallelSourceFunction from * definition of parallelism data source. Class MyCustomerSource extends SourceFunction[StationLog] {var flag = true if the stream is terminated;  /** * main method * start a Source * Most of the time, you need to implement a loop in the run method, * * @Param sourceContext * @throws Exception */ Override def Run (sourceContext: SourceFunction.SourceContext[StationLog]): Unit = {val random = new random () var types = Array("fail", "busy", "barring", "success") while (flag) {// If the flow does not terminate, 1. To (5). Map (I => {var callOut = "1860000% 04D ". Format (Random. NextInt (10000)) var callIn = "1890000%04d".format(random.nextInt(10000)) new StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4)), System.currentTimeMillis(), 0)}).foreach(SourceContext.Collect (_)) // Send data Thread.Sleep (2000) // Send data for 2 seconds per time sleep}} Unit = flag = false } object CustomerSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) import org.apache.flink.streaming.api.scala._ val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource) stream.print() env.execute() } }

3. Flink’s Sink data target

Flink provides a large number of implemented data Sink targets for Datastream, including File, Kafka, Redis, HDFS, ElasticSearch, and more.

3.1, HDFS Sink

3.1.1 Config the connector dependency to support Hadoop Filesystem

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - filesystem_2. 11 < / artifactId > The < version > 1.10.1 < / version > < / dependency >

3.1.2、Streaming File Sink

https://ci.apache.org/project…

The Streaming File Sink allows you to write data to HDFS using buckets, eachPoints barrelsThat corresponds to a directory in HDFS. The default in accordance with thePail by hourWithin a bucket, the output is further based on the scroll policyCut it into smaller piecesThe file. This helps prevent bucket files from becoming too large. The scroll policy is also configurable. The default policy scrolls files based on the size of the file and the timeout, which is the time when no new data is written to part files.

3.1.2.1 Scroll strategy

  • DefaultRollingPolicy
  • CheckpointRollingPolicy

    3.1.2.2 Bucket strategy

    • DateTimeBucketAssigner : Default time based assigner
    • BasePathBucketAssigner : Assigner that stores all part files in the base path (single global bucket)

Note that checkpoint must be turned on, otherwise the generated files will be inprocess state

3.1.2.3 Code implementation

package com.chb.flink.sink import com.chb.flink.source.{MyCustomerSource, StationLog} import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object HDFSFileSink { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment StreamEnv. SetParallelism (1) / / import implicit conversion import org. Apache. Flink. Streaming. API. Scala. _ / / start the checkPoint, otherwise, The state of the generated file is inprocess streamEnv. EnableCheckpointing (1000) / / data source val data: DataStream[StationLog] = streamenv.addSource (new MyCustomerSource) // create a file rolling rule val rolling: DefaultRollingPolicy [StationLog, String] = DefaultRollingPolicy. The create () withInactivityInterval (2000) / / no activity of time interval. .withRolloverInterval(2000) // Generate a file every two seconds, Important. The build () / / create a HDFS Sink var hdfsSink = StreamingFileSink. ForRowFormat [StationLog] (/ / notice here is the Path of new flink Path("hdfs://ShServer:9000/sink001/"), New SimpleStringEncoder[StationLog]("UTF-8"). WithBucketCheckInterval (1000) // WithBucketAsSigner (new) MemberbucketAssigner).withRollingPolicy(rolling).build() // Add Sink Data.addSink (hdfsSink) streamenv.execute ()} import  org.apache.flink.core.io.SimpleVersionedSerializer import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner import . Org. Apache. Flink, streaming API functions provides the sink. The filesystem. Bucketassigners. SimpleVersionedStringSerializer / * * * custom barrels of strategy */ class MemberBucketAssigner extends BucketAssigner[StationLog, String] {// Override def getBucketid (info: StationLog, context: bucketAssigner.Context): String = { val date = new Date(info.callTime) new SimpleDateFormat("yyyy-MM-dd/HH").format(date) } override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE } }

3.2 Redis-based Sink

In addition to the built-in connectors, Flink has a number of additional connectors released through Apache Bahir, including:  Apache ActiveMQ (source/sink) Apache Flume (sink) cif Redis (sink) xanakka (sink) cif Netty (source)

3.2.1, rely on

< the dependency > < groupId > org. Apache. Bahir < / groupId > < artifactId > flink - connector - redis_2. 11 < / artifactId > The < version > 1.0 < / version > < / dependency >

3.2.2 Write the result to Redis

package com.chb.flink.sink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object RedisSink { def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment Streamenv.setParallelism (1) // import the implicit conversion, recommend it here, Can prevent the IDEA of error code hinting problem import org. Apache. Flink. Streaming. API. Scala. _ / val/read data stream = StreamEnv. SocketTextStream (" hadoop01 ", 8888) / / val transformation calculation result = stream. FlatMap (_. The split () ", "). The map ((_, 1).keyBy(0).sum(1); 2).keyBy(0).sum(1 FlinkJedisPoolConfig. Builder (). SetDatabase (1) setHost (" hadoop01 "). The setPort (6379). The build () / / writer redis result. AddSink (new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] { override def getCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "T_WC ") = {data._1 //}" T_WC "); (String, Int)) = {data. _2 + "/ / the number of occurrences of words"}})) streamEnv. The execute ()}}

3.3, Kafka Sink

3.3.1. The first type

package com.chb.flink.sink import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer /** * Kafka Sink */ object KafkaSinkByString { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) The parallel degree of each task / / by default to 1 import org. Apache. Flink. Streaming. API. Scala. _ / / read netcat stream data (real-time flow) val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", // result = stream1.flatMap(_.split(",")); // result = Kafka; AddSink (new flinkkafkaProducer [String](" Hadoop 01:9092", "T_TOPIC ", new SimpleStringSchema())) streamEnv.execute() } }

3.3.2 The second type

package com.chb.flink.sink import java.lang import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer /** * Kafka Sink */ object KafkaSinkByKeyValue { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) The parallel degree of each task / / by default to 1 import org. Apache. Flink. Streaming. API. Scala. _ / / read netcat stream data (real-time flow) val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", // val result = stream1.flatMap(_.split(",")).map((_,)); Var props = new Properties() props. SetProperty ("bootstrap.servers", props = new Properties() props. SetProperty ("bootstrap.servers", props = new Properties() props. "hadoop01:9092") props.setProperty("key.serializer", ClassOf [StringSerializer].getName) props. SetProperty (" Value.Serializer ", classOf[StringSerializer].getName) // Write data to Kafka, AddSink (new flinkkafkaProducer [(String, Int)](" T_Topic ", new KafkaserializationSchema [(String, Int)] Int)] { override def serialize(element: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord("t_topic", element._1.getBytes, (element._2 + "").getBytes()) } }, props, FlinkKafkaProducer. Semantic. EXACTLY_ONCE)) / / EXACTLY_ONCE precise streamEnv. Once the execute ()}}

3.4. Customize Sink

package com.chb.flink.sink import java.sql.{Connection, DriverManager, PreparedStatement} import com.chb.flink.source.{MyCustomerSource, StationLog} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org. Apache. Flink. Streaming. API. Scala. StreamExecutionEnvironment / * * StationLog data read from the Source of the custom, * * You can use Flink to write to the MySQL database. There are two ways to achieve this: * 1, implement the SinkFunction interface. * 2. Implement RichsInkFunction class. The latter adds lifecycle management capabilities. * For example, if you need to create a connection object when Sink is initialized, it is better to use the second method. Case requirement: Write the StationLog object to the MySQL database. MyCustomSink extends RichsInkFunction [StationLog] {var conn: myCustomSink [StationLog]; Connection = _ var PST: PreparedStatement = _ // Override def Open (parameters: Configuration) when Sink is initialized: Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123") pst = conn.prepareStatement("insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) values(? ,? ,? ,? ,? ,?) Override def invoke(value: stationLog, context: sinkFunction.Context[_]): override def invoke(value: stationLog, context: sinkFunction.Context[_]): Unit = { pst.setString(1, value.sid) pst.setString(2, value.callOut) pst.setString(3, value.callIn) pst.setString(4, value.callType) pst.setLong(5, value.callTime) pst.setLong(6, value.duration) pst.executeUpdate() } override def close(): Unit = { pst.close() conn.close() } } def main(args: Array[String]): Unit = {/ / initialize Streaming in the Flink (flow) context execution environment val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment StreamEnv. SetParallelism (1) / / import implicit conversion, suggest to write here, can prevent the IDEA of error code hinting problem import org. Apache. Flink. Streaming. API. Scala. _ val data: DataStream[StationLog] = streamenv.addSource (new MyCustomerSource) // MSYQL Data.addSink (new MyCustomSink) streamEnv.execute() } }

4. DataStream conversion operator

This is very simple, just look at the API

Function classes and rich function classes

Almost all of the operators in the previous section can have a custom function class, a rich function class as a parameter. Because Flink exposes interfaces to both classes of functions, the common ones are:

  • MapFunction
  • FlatMapFunction
  • ReduceFunction
  • .

Rich function interfaces differ from other regular function interfaces in that they can capture the context of the runtime environment, manage State within the context, and have lifecycle methods that enable more complex functionality. The interfaces of rich functions are:

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • .

    5.1. Example of common function class: output the dialing time and ending time of each call according to the specified time format

    5.2 Rich function class example: the successful call information into the real user name

    The user table corresponding to the caller (in MySQL data) is:



    Since you need to query data from the database, you need to create a connection, and the code to create the connection must be written in the open method of the lifecycle. So you need to use rich function classes.

    Rich Function has a concept of life cycle.Typical lifecycle approachThere are:

    • The open() method is the initializer of the rich function, and is called before an operator such as map or filter is called.
    • The close() method is the last method called in the lifecycle and does some cleanup.
  • The getRunTimeContext () method provides some information about the function’s RunTimeContext, such as the parallelism of the function’s execution, the name of the task, and the state state
package com.chb.flink.func import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import java.text.SimpleDateFormat import com.chb.flink.source.StationLog import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import Org. Apache. Flink, streaming API. The scala. StreamExecutionEnvironment / * * * rich functions, for example: */ object testFunction {def main(args: Array[String]): The Unit = {val streamEnv = StreamExecutionEnvironment. GetExecutionEnvironment / / implicit conversion import org.apache.flink.streaming.api.scala._ val data: DataStream[StationLog] = streamEnv.readTextFile(getClass.getResource("/station.log").getPath) .map(line => { val arr = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, Arr (5).trim.tolong)}) // Define the time output format val format = new simpleDateFormat (" yyyy-mm-dd HH: MM :ss") // filter out those calls that are successful Data.filter (_.CallTyp.equals ("success")).map(new CallMapFunction(format)).print() streamenv.execute ()}} // class CallRichMapFunction() extends RichMapFunction[StationLog, StationLog] { var conn: Connection = _ var pst: PreparedStatement = _ // LifestyleStatement = _ // LifestyleStatement = _ // LifestyleStatement = Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456") pst = conn.prepareStatement("select name from t_phone where phone_number =?") } override def map(in: StationLog: StationLog = {Pst. setString(1, in.callOut) val set1: StationLog = {Pst. setString(1, in. ResultSet = pst.executeQuery() if (set1.next()) {in.callOut = set1.getString(1)} in.callIn) val set2: ResultSet = pst.executeQuery() if (set2.next()) {in.callin = set2.getString(1)} in} Unit = { pst.close() conn.close() } }

The underlying ProcessFunctionAPI

ProcessFunction is a low-level Stream processing operation that allows the underlying building blocks of all streams to be returned:

  • Access the Event data (e.g. Event time, Event current Key, etc.)
  • Manage State (only in Keyed Stream)

    • Manage Timer Timer (including: register Timer, delete Timer, etc.)

All in all, ProcessFunction is Flink’s lowest level API and its most powerful.

For example: monitor every cell phone and send a warning message if a call to it fails within 5 seconds.

package com.chb.flink.func import java.text.SimpleDateFormat import java.util.Date import com.chb.flink.source.StationLog import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import Org. Apache. Flink, streaming API. Scala. StreamExecutionEnvironment import org. Apache. Flink. Util. The Collector / * * * to monitor each mobile phone number, / object testProcessFunction {def main(args: Array[String]); / object testProcessFunction {def main(args: Array[String]); Unit = {// Initialize Flink Streaming context execution val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment. GetExecutionEnvironment streamEnv. SetParallelism (1) / / import implicit conversions Import org. Apache. Flink, streaming API. Scala. _ / / socket data read val data = streamEnv. SocketTextStream (" 10.0.0.201 ", 8888) .map(line => { var arr = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, Arr (5).trim.tolong)}) // Data.keyBy(_.callout).process(new MonitorCallFail()).print() streamenv.execute ()} class MonitorCallFail() extends KeyedProcessFunction[String, StationLog, String] {lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", Override def ProcessElement (Value: StationLog, Context: KeyedProcessFunction[String, StationLog, String]#Context, collector: Collector[String]): If (value.callType.equals("fail") &&time == 0) {// Fail for the first time // Fail for the current time, Val Now = Context.TimerService ().CurrentProcessingTime () var onTime = Now + 5000L // context.timerService().registerProcessingTimeTimer(onTime); Println ("first time: "+ new Date()) timeState.update(onTime)} if (! value.callType.equals("fail") && time ! = 0) {context. TimerService () deleteProcessingTimeTimer (time) timeState. The clear ()}} / / time to perform a trigger, Override def onTimer(timestamp: Long, CTX: keyedProcessFunction [String, StationLog, String]# onTimerContext, out: Collector[String]): Unit = {val df = new SimpleDateFormat(" yyyy-mm-dd HH: MM :ss") var warnStr =" "+ df. Format (new Date(timestamp)) + ctx.getCurrentKey out. Collect (warnStr) timeState.clear()}}" + df. Format (new Date(timestamp)) + ctx.getCurrentKey out.

Side Output flow

When Flink works with data streams, we often encounter situations where we need to separate different types of data from a single data source

  • If the filter operator is used to filter and divide the data source, it is bound to cause multiple copies of the data stream, resulting in unnecessary waste of performance.
  • Lateral output is a shunt mechanism that divides the data stream without replicating the stream.
  • Another purpose of Flink’s side output is to process late data so that late data does not have to be discarded.

Case: According to the log of the base station, please output the successful Stream (mainstream) and unsuccessful Stream (side) separately.

package com.chb.flink.func import com.chb.flink.source.StationLog import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector /** * Outputs a Stream that was called successfully (mainstream) and an unsuccessful Stream (side) separately. */ Object TestSideOutputStream {// The output stream first needs to define a label for the stream, Here need to transform the implicit in front the import org. Apache. Flink. Streaming. API. The scala. _ var notSuccessTag = new OutputTag [StationLog] (" not_success ") Def main(args: Array[String]): Unit = {// Streaming the Flink context val streamen: StreamExecutionEnvironment = StreamExecutionEnvironment. GetExecutionEnvironment streamEnv. SetParallelism val (1) / / read the file data  data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath) .map(line => { var arr = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) val mainStream: DataStream[StationLog] = data.process(new CreateIdeOutputStream (notSuccesStag)) // get sideOutput val sideOutput: DataStream[StationLog] = mainStream.getSideOutput(notSuccessTag) mainStream.print("main") sideOutput.print("sideoutput")  streamEnv.execute() } class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] { override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): If (value. CallType. Equals ("success")) {if (value. CallType. Equals ("success")) {if (value. value) } } } }

There’s also a video on my B station – Bogue CHBXWI hope you can support me. Thank you.


Flink Directory Guide

Pay attention to my public number [big data], more dry goods.