Kafka profile

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed by LinkedIn, which contributed to the Apache Foundation in 2010 and became a top open source project. Kafka is used to build real-time data pipelines and streaming applications. It has horizontal expansibility, fault tolerance, extremely fast speed, has also been widely used.

Kafka is not only a distributed messaging system but also supports streaming computing, so before introducing the application of Kafka in Apache Flink, let’s take a simple example of Kafka to understand what Kafka is.

The installation

This article is not a systematic, detailed introduction to Kafka, but to give you an intuitive understanding of Kafka, in order to carry out a good application in Apahe Flink, so we install Kafka in the simplest way.

  • Downloading binary packages

The curl - L - O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgzCopy the code
  • To install Kafka, simply unzip the TGZ download as follows:

Sunjc $tar -zxf kafka_2.11-2.1.0.tgz Jincheng :kafka jincheng. Sunjc $cdKafka_2.11-2.1.0 Jincheng :kafka_2.11-2.1.0 jincheng.sunjc$ls LICENSE NOTICE bin config libs site-docsCopy the code

Bin contains all Kafka management commands, such as Kafka Server, which we will start next.

  • Kafka Server Kafka is a publish-subscribe system. To subscribe to messages, a service must first exist. Let’s start a Kafka Server instance. Kafka needs to use ZooKeeper. To get it up and running we need to install a ZooKeeper cluster, which is outside the scope of this article, so we use the script provided by Kafka to install a single ZooKeeper instance. As follows:

Jincheng: kafka_2. 11-2.1.0 jincheng. Sunjc $bin/zookeeper - server - start. Sh config/zookeeper. The properties and [2019-01-13 09:06:19.985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) .... . [the 2019-01-13 09:06:20, 061] INFO binding to the port 0.0.0.0/0.0.0.0:2181 (org. Apache. The zookeeper. Server NIOServerCnxnFactory)Copy the code

After starting, ZooKeeper binds port 2181 (default). Next we start Kafka Server as follows:

Jincheng :kafka_2.11-2.1.0 jincheng.sunjc$bin/kafka-server-start.sh config/server.properties [2019-01-13 09:09:16.937] INFO Registered kafka:type= kafka. Log4jController an (kafka. Utils. Log4jControllerRegistration $) [the 09:09:17 2019-01-13, 267] INFO starting [2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 [2019-01-13 09:09:17.284] INFO [ZooKeeperClient] Initializing a new session to kafka.server.kafkaserver localhost:2181. (kafka.zookeeper.ZooKeeperClient) ... . [2019-01-13 09:09:18,253] INFO [KafkaServer id=0] Started (kafka.server.kafkaserver)Copy the code

If all goes well, Kafka’s installation is complete.

Create a Topic

Kafka is a message subscription system. We create a Topic called flink-tipic. In a new terminal, execute the following command:

Sunjc $bin/kafka-topics. Sh --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic flink-tipic Created topic"flink-tipic".Copy the code

In Kafka Server’s terminal, the following information is displayed:

. [the 2019-01-13 09:13:31, 156] INFO is Createdlog for partition flink-tipic-0 in/ TMP /kafka-logs with properties {compression. Type -> producer, message.format.version -> 2.1-iv2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable ->true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min. Cleanable. Dirty. Thewire - > 0.5, the index. The interval. The bytes - > 4096, unclean. Leader. Election. Enable - >false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...Copy the code

This shows the configuration of the basic properties of Flink-Topic, such as message compression, message format, number of backups, and so on.

In addition to looking at the log, we can use the command display to check whether we have successfully created flink-topic, as follows:

Sunjc $bin/kafka-topics. Sh --list --zookeeper localhost:2181 flink-tipicCopy the code

If flink-tipic is printed, then our Topic was successfully created.

So where is Topic kept? How does Kafka publish and subscribe to messages? For intuition, let’s take a look at the following Kafka architecture diagram to briefly understand:

Kafka uses ZooKeeper to store cluster information. Kafka Server can have multiple Kafka Server instances in a cluster. Kafka Server is called a Broker. The Topic we create can be in one or more brokers. Kafka sends messages in Push mode and pulls messages in Pull mode.

Send a message

How do you send a message to an existing Topic? Of course you can write code to send a message in an API. At the same time, you can also use the command to send messages conveniently, as follows:

Sunjc $bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic  >Kafkatest msg 
>Kafka connectorCopy the code

Kafka test MSG Kafka connector flink-Topic

Read the message

How about reading messages for a given Topic? API and command can also be done. We read the message of Flink-topic by command, as follows:

Sunjc $bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning Kafkatest msg
Kafka connectorCopy the code

Where –from-beginning describes where we read the message from Topic.

Flink Kafka Connector

Flink Kafka Connector can be used to install the Flink Kafka Connector. The basics of Flink Connector will be explained in Apache Flink series (14) – Connectors. Here we will introduce Kafka Connector directly.

Apache Flink provides several versions of Kafka Connector. This article uses Flink-1.7.0 as an example.

MVN rely on

To use Kakfa Connector we need to add a dependency on Kafka Connector in our POM as follows:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - kafka_2. 11 < / artifactId > The < version > 1.7.0 < / version > < / dependency >Copy the code

Flink Kafka Consumer needs to know how to convert binary data in Kafka into Java/Scala objects. DeserializationSchema allows users to specify such a schema. Call the T deserialize (byte [] message) method for each Kafka message, passing values from Kafka.

Examples

Our example reads data from Kafka and writes it to Kafka after some simple processing. We need to create another Topic for writing, like this:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-outputCopy the code

So in the example, Source uses Flink-topic, Sink uses slink-topic-output.

Simple ETL

We assume that Kafka stores a simple string, so we need an implementation to serialize and deserialize strings, That is, we need to define a class that implements serialization and DeserializationSchema. Since our example is a string, we will customize a KafkaMsgSchema implementation class and then write the main Flink program.

  • KafkaMsgSchema – Complete code

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaMsgSchema() {// Default utF-8 encoding this(charset.forname ()"UTF-8"));
    }

    public KafkaMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        returnthis.charset; } public String deserialize(byte[] message) {// Deserialize Kafka messages into Java objectsreturnnew String(message, charset); } public Boolean isEndOfStream(String nextElement) {// The stream never endsreturn false; } public byte[] serialize(String Element) {// Serialize Java objects into Kafka messagesreturn element.getBytes(this.charset);
    }

    public TypeInformation<String> getProducedType() {// Define the generated data Typeinforeturn BasicTypeInfo.STRING_TYPE_INFO;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); String charsetName = in.readUTF(); this.charset = Charset.forName(charsetName); }}Copy the code
  • Main program – complete code

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import java.util.Properties; Public class KafkaExample {public static void main(String[] args) throws Exception {// Obtain final ParameterTool for user parameters parameterTool = ParameterTool.fromArgs(args); . / / the Stream environment StreamExecutionEnvironment env = StreamExecutionEnvironment getExecutionEnvironment (); // Source topic StringsourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output"; // Broker address String broker ="localhost:9092"; / / attribute parameter - the actual production can be introduced into the Properties p = parameterTool. On the command line the getProperties (); p.putAll(parameterTool.getProperties()); p.put("bootstrap.servers", broker); env.getConfig().setGlobalJobParameters(parameterTool); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(sourceTopic, new KafkaMsgSchema(), p); / / read the first data set / / consumer setStartFromEarliest (); DataStream<String> input = env.addsource (consumer); DataStream<String> result = input.map(new MapFunction<String, String>() { public String map(String s) throws Exception { String msg ="Flink study ".concat(s);
                System.out.println(msg);
                returnmsg; }}); FlinkKafkaProducer = new FlinkKafkaProducer<String>(sinkTopic, new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()), p, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); // Write data to the Kafka Topic. Result.addsink (producer); // Execute job env.execute("Kafka Example"); }}Copy the code

Run the main program as follows:



The procedure for my test operation is as follows:

  1. Start flink-topic and flink-topic-output consumption pull;

  2. Add test message to flink-topic by command only for test;

  3. Verify the added test message only for test by command printing;

  4. The simplest FlinkJob source->map->sink maps the test message: “Flink study “. Concat (s);

  5. Print sink data by command.

#### Built-in Schemas Apache Flink internally provides the following three built-in Schemas for common message formats:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema) which is based on the Flink TypeInformation create mode. This is useful if the data is written and read by Flink.

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema) it will convert the JSON serialization ObjectNode object, Objectnode.get (” field “) can be used as (Int/String /…) () to access the field from. KeyValue objectNode contains the “Key” and “Value” fields, which contain all fields and the optional “metadata” field, which exposes the offset/partition/subject of this message.

  • AvroDeserializationSchema read it using static model using Avro format of serialized data. It can be generated from the Avro class (AvroDeserializationSchema forSpecific (…). ) deduce mode, or it can be used with GenericRecords manual provided by the mode (using AvroDeserializationSchema. ForGeneric (…). )

To use the built-in Schemas, add the following dependencies:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - avro < / artifactId > < version > 1.7.0 < / version > </dependency>Copy the code

Read position configuration

Apache Flink’s FlinkKafkaConsumer provides a number of convenient locations for consuming Kafka data:

  • Consumer. SetStartFromEarliest () – since the earliest record of;

  • Consumer. SetStartFromLatest () – starting from the latest record;

  • consumer.setStartFromTimestamp(…) ; // Start with the specified epoch timestamp (milliseconds);

  • consumer.setStartFromGroupOffsets(); // The default behavior continues consumption from the offset of the last consumption.

The above location specifies that each partition can be exact, as in the following code:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // SpecificStartoffsets. put(new KafkaTopicPartition("myTopic", 1), 31L); SpecificStartOffsets. Put (new KafkaTopicPartition("myTopic", 2), 43L); / / the third partition from 43 l consumer. SetStartFromSpecificOffsets (specificStartOffsets);Copy the code

The default setStartFromGroupOffsets is used for unspecified partitions.

Topic found

Kafka supports Topic automatic discovery, i.e., creating FlinkKafkaConsumer in a regular way, for example:

/ / create consumer FlinkKafkaConsumer consumer = new FlinkKafkaConsumer < String > (java.util.regex.Pattern.com running (sourceTopic.concat("- [0-9]")),
new KafkaMsgSchema(),
p);Copy the code

In the example above, when the job starts running, the consumer will subscribe to all the topics whose names match the specified regular expression (starting with the value of sourceTopic and ending with a single number).

Define the Watermark (Windows)

The application of Kafka Connector is not limited to the above simple data extraction, we are more expected to perform event-time window operation on Kafka data, so we need to define Watermark in Flink Kafka Source.

To define an event-time, we start with Kafka data that carries a time attribute, assuming that our data is in a String#Long format, such as only for test#1000. So let’s make Long the time column.

  • To parse the above Kafka data format, we need to develop a custom Schema, such as KafkaWithTsMsgSchema, Parse String#Long to a Java Tuple2

    as follows:
    ,>

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaWithTsMsgSchema() {
        this(Charset.forName("UTF-8"));
    }

    public KafkaWithTsMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public Tuple2<String, Long> deserialize(byte[] message) {
        String msg = new String(message, charset);
        String[] dataAndTs = msg.split("#");
        if(dataAndTs.length == 2){
            return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
        }else{// The runtime exception needs to be thrown. System.out.println("Fail due to invalid msg format.. ["+msg+"]");
            return new Tuple2<String, Long>(msg, 0L);
        }
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
        return false;
    }

    public byte[] serialize(Tuple2<String, Long> element) {
        return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }

    @Override
    public TypeInformation<Tuple2<String, Long>> getProducedType() {
        returnnew TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); }}Copy the code
  • Watermark generated

To extract the timestamp and create a Watermark, you need to implement a custom time extract and Watermark generator. Within Apache Flink there are two methods as follows:

  • Watermark AssignerWithPunctuatedWatermarks – each record.

  • AssignerWithPeriodicWatermarks – periodically generated Watermark.

    Take AssignerWithPunctuatedWatermarks we write a custom time extraction and Watermark generator. The code is as follows:

import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import javax.annotation.Nullable; public class KafkaAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> { @Nullable @Override public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, Long l) {// Create a Watermark with the extracted timestampreturnnew Watermark(l); } @override public long extractTimestamp(Tuple2<String, long > o, long l)returno.f1; }}Copy the code
  • Main Procedure – Complete Procedure We calculate a Tumble window of 1 second size and calculate the maximum value within that window. The complete procedure is as follows:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import java.util.Properties; Public class KafkaWithEventTimeExample {public static void main (String [] args) throws the Exception {/ / user preferences for final ParameterTool parameterTool = ParameterTool.fromArgs(args); . / / the Stream environment StreamExecutionEnvironment env = StreamExecutionEnvironment getExecutionEnvironment (); / / Event - a time set env. SetStreamTimeCharacteristic (TimeCharacteristic. EventTime); // Source topic StringsourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output"; // Broker address String broker ="localhost:9092"; / / attribute parameter - the actual production can be introduced into the Properties p = parameterTool. On the command line the getProperties (); p.putAll(parameterTool.getProperties()); p.put("bootstrap.servers", broker); env.getConfig().setGlobalJobParameters(parameterTool); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(sourceTopic, new KafkaWithTsMsgSchema(), p); TypeInformation<Tuple2<String, Long>>typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
                BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

        DataStream<Tuple2<String, Long>> input = env
                .addSource(consumer).returns(typeInformation) / / extraction timestamp, and production of Watermark. AssignTimestampsAndWatermarks (new KafkaAssignerWithPunctuatedWatermarks ()); DataStream<Tuple2<String, Long>> result = input .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))) .max(0); New FlinkKafkaProducer<Tuple2<String, Long>>(sinkTopic, new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()), p, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); // Write data to the Kafka Topic. Result.addsink (producer); // Execute job env.execute("Kafka With Event-time Example"); }}Copy the code

The test runs are as follows

To clarify, we enter the number as follows:

Msg Watermark
E#1000000 1000000
A#3000000 3000000
B#5000000 5000000
C#5000100 5000100
E#5000120 5000120
A#7000000 7000000

We look at data between 5000000 and 7000000, where B#5000000, C#5000100 and E#5000120 are contents of the same window. Calculate the MAX value, and by string comparison, the largest message is the output E#5000120.

Kafka Timestamps to carry

In Kafka-0.10+ messages you can carry timestamps, which means you don’t have to display a single column of data in MSG as timestamps. This is only easier when Flink is used for both writing and reading. In general, the example above is sufficient.

summary

This article focuses on how Kafka is used in Flink. It starts with a simple installation of Kafka and a command demonstration of sending and receiving messages. Then it gives you a simple data extraction and an event-time window example to give you a visual sense of how to use Kafka in Apache Flink.


Articles you may be interested in:

  • Flink introduction

  • Flink DataSet&DataSteam API

  • Flink cluster deployment

  • Flink Restart policy

  • Flink distributed cache

  • Flink Restart policy

  • Flink in Time

  • Windows in Flink

  • Flink time stamps and watermarks

  • Flink broadcasts variables

  • Flink-Kafka-connetor

  • Flink-Table&SQL

  • Flink actual combat project – Hot seller list

  • Flink-Redis-Sink

  • Flink consumes Kafka to write to Mysql

More practical cases will be updated later…