preface

This article introduces Kafka 0.11 for true real-time streaming computing

transform

Define the messaging format

${timetamp},${word}, separated by commas. Ok, it looks the same as when we defined the data format. But the important thing here is the choice of separators because we’re not dealing with a complex business scenario, you don’t need JSON, just use the usual column separators, such as the comma used in CSV, but it’s important to note that your ${word} contains the separators you choose

The message incoming is changed to Kafka

With the introduction of Kafka, we pass in parameters, which are parsed using the official method

val params = ParameterTool.fromArgs(args)
Copy the code

We then create a FlinkKafkaConsumer011 object with the parameters, which we add to the environment via env.addSource

val dataStream = env.addSource(
      new FlinkKafkaConsumer011[(String)](
        params.getRequired("input.topic"),new SimpleStringSchema, params.getProperties)).uid("add-source")
Copy the code

Here we just use SimpleStringSchema to parse Kafka messages as strings, and we need a map operation to convert the data

.map{x =>
        val arr = x.split(",")
        if(arr.length ! = 2){ println(s"The resolution${x}Failure")
          return null
        }else{ (arr(0), arr(1)) } }.filter(_ ! = null)Copy the code

Error tolerance is implemented here. If parsing fails, a NULL is returned, meaning that a null data is passed to the next operator, which is then filtered by a filter

Now that we’re done with this, we’re ready to test the wave

--bootstrap.servers localhost:9092
--group.id flink
--input.topic flink-searchTrend-source
Copy the code

We save the parameters in the TXT file for later copying. When debugging locally, you can paste the parameters into the input box as shown in the picture, and then run the main method

You are advised to use the kafka command line command as follows

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-hotWordStatisticJob-source
Copy the code

Here we first to the message of the incoming part of the transformation of the adjustment, and then the next step

The message output is changed to Kafka

We assign the DataStream calculated previously to resultDataStream, and then set the data output port, namely Sink, for it. Of course, it should also be turned into a string in advance

resultDataStream.map(x => s"${x._2}.${x._3}").addSink(
      new FlinkKafkaProducer011[String](
        params.getRequired("output.topic"),new SimpleStringSchema,params.getProperties
      )
    )
Copy the code

Add a parameter

--output.topic flink-hotWordStatisticJob-sink
Copy the code

Final effect

The input message

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-hotWordStatisticJob-sink
Copy the code

At the end

If the requirements of the product design were to accumulate hot word statistics all the time, then the code would be ready to be deployed to production at this point, which of course will change later