Introduction to the

What is flink-kafka-connector used for?

The partition mechanism in Kafka and Flink parallelism mechanism are combined to achieve data recovery. Kafka can be used as Flink’s source and sink task failure, and Kafka offset is set to restore the application

A brief introduction to Kafka

There will be a feature article on Kafka, but here are a few essential concepts to know.

As the name implies, a Producer is a component that produces messages. Its main job is to continuously produce messages and then send them to the message queue. Producers can send various types of messages to message queues, such as narrow string messages, as well as binary messages. The producer is the data source for the message queue, and the message queue can continuously process messages only if the producer continuously sends messages to the message queue. A Consumer is a component that continuously consumes (retrieves) messages from a message queue (Kafka itself). In other words, producers are constantly sending messages to message queues, and consumers are constantly fetching messages from message queues. Themes are a very important concept in Kafka. First, a topic is a logical concept that logically categorizes and stores the messages themselves. Multiple producers can send messages to a Topic, and multiple consumers can consume messages within a Topic. Topic also has the concept of partitions and replicas. Topics are closely related to messages. Every message in Kafka belongs to a Topic, and there can be any number of messages under a Topic.

Kafka is simple to operate

Start the zk: nohup bin/zookeeper – server – start. Sh config/zookeeper properties &

Start the server: nohup bin/kafka-server-start.sh config/server.properties &

Create a topic: bin/kafka-topics. Sh –create –zookeeper localhost:2181 –replication-factor 1 — Partitions 1 –topic test

View topic: bin/kafka-topics. Sh –list –zookeeper localhost:2181

Bin /kafka-console-producer.sh –broker-list localhost:9092 –topic test

Start a consumer: bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning

Delete topic: bin/kafka-topics. Sh –delete –zookeeper localhost:2181 –topic topn

Flink consumption Kafka precautions

  • SetStartFromGroupOffsets ()

    By default, the last offset saved is read. If the application is started for the first time and the last offset cannot be read, the data is consumed based on the value of auto-.offset. Reset

  • SetStartFromEarliest () starts consumption from the earliest data, ignoring the stored offset information

  • SetStartFromLatest () consumes from the latest data, ignoring the stored offset information

  • SetStartFromSpecificOffsets (Map < KafkaTopicPartition, Long >) from the specified location

  • When checkpoint is enabled, KafkaConsumer periodically stores the Kafka offset information along with the status information of other operators. When a job fails and restarts, Flink recovers data from the last checkpoint and consumes data in Kafka again.

  • Checkpoint env.enablecheckPointing (5000); // Checkpoint every 5s

Set up Kafka standalone environment

I have a local installation of kafka_2.11-2.1.0

Start Zookeeper and Kafka Server:

Start the zk: nohup bin/zookeeper - server - start. Sh config/zookeeper properties & start server: nohup bin/kafka-server-start.sh config/server.properties &Copy the code

Create a topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Copy the code

Practical cases

All the code I put on my public account, reply Flink can download

  • Massive [Java and big data interview questions + video material] collation in the public account, attention can be downloaded ~
  • Welcome to discuss more big data technology with the author

Kafka as Flink Sink

First, POM dependencies:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - connector - kafka_2. 11 < / artifactId > The < version > 1.7.0 < / version > < / dependency >Copy the code

Write data to Kafka:

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers"."localhost:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
	    FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties); / * / / event - producer. The events of the timestamp time setWriteTimestampToKafka (true); */ text.addSink(producer); env.execute(); / /}}Copy the code

In particular, we have implemented a MyNoParalleSource with 1 parallelism to produce the data:

// use parallelism of 1source
public class MyNoParalleSource implements SourceFunction<String> {//1

    //private long count = 1L;
    private boolean isRunning = true; /** * Main method * start onesource* In most cases, you need to implement a loop in this run method, @param CTX * @throws Exception */ @Override public void run(SourceContext<String> CTX) throws Exception {while(isRunning){List<String> books = new ArrayList<>(); books.add("Pyhton from starter to quit."); //10 books.add("Java from entry to abandonment"); //8 books.add("Php from getting started to giving up"); //5 books.add("C++ from entry to abandonment"); //3 books.add("Scala from getting started to giving up"); //0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); Thread.sleep(2000); thread.sleep (2000); Void Override public void Override public void Override public void Override public void Override public void Overridecancel() {
        isRunning = false; }}Copy the code

The code implements a sender to send the title <Pyhton from Start to Quit ><Java from Start to Quit >…

Then right-click to run our program and the console output looks like this:

The flow of production data began.

Then let’s look at the kafka test topic with the following command:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Copy the code

The output is as follows:

Kafka as Flink Source

Directly on the code:

public class KafkaConsumer {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers"."localhost:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); / / from the earliest start spending consumer. SetStartFromEarliest (); DataStream<String> stream = env .addSource(consumer); stream.print(); //stream.map(); env.execute(); / /}}Copy the code

The console output is as follows: