We all know that Kafka has a lot of throughput, but does Kafka lose messages? Do you re-consume messages?

Many companies have to ensure that messages are not lost or repeated because of their business requirements, such as the real-time monitoring system of UAV. When uav intrudes into the airport area, we must immediately report to the police to prevent message loss. And we need to disarm the alarm when the drone leaves the no-fly zone. If the message is repeated, do we need complex logic to handle the message duplication ourselves, which is probably too complex to handle? But if we can guarantee that the message is exactly once, then everything is much easier.

Let’s take a quick look at the messaging semantics and kafka’s messaging mechanism.

The first thing we need to understand is message delivery semantic.

This is a general concept, namely the assurance of message delivery during message delivery.

There are three kinds:

At most once: A message can be lost or processed, but only once.

May be lost will not repeat

At least once: A message is not lost, but may be processed more than once.

Repetition may not be lost

Exactly once: A message is processed and only processed once.

Do not lose do not repeat once

Kafka actually passes messages twice, once a producer sends a message to Kafka and once a consumer goes to Kafka to consume a message.

Both passes affect the final result,

Both are exact once, but the final result is exact once.

One of the two messages will be lost, or one will be repeated, and the end result is likely to be lost or repeated.

1. Produce end messaging

Here’s the code on the producer side:

Properties properties = new Properties();
        properties.put("bootstrap.servers"."kafka01:9092,kafka02:9092");
        properties.put("acks"."all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430"."testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();
Copy the code

The acks parameter is specified to have three values to choose from:

0: The producer completely ignores the result of the broker’s processing and the callback is useless. There is no guarantee that the message will be sent successfully, but this is the highest throughput

All or -1: The leader broker will wait for the message to be written and the ISR to be written before responding. This will not be lost as long as the ISR has a surviving copy, but the throughput is the lowest.

1: The default value is that the leader Broker responds after being written by itself and does not wait for other copies of the ISR to be written. As long as the Leader Broker is alive, it will not be lost, which guarantees not to be lost and throughput.

Therefore, if the value is set to 0, the message will not be lost as long as the cluster stability is ensured.

However, if the message is successfully written and the producer receives no response due to a network problem, the producer will retry the message until the network is restored and the message is sent multiple times. At least once.

The default value of acks for kafka producer is 1, so the default producer level is at least once. Not exactly once.

FIG. Kafka apis

Consumer side messaging

The consumer relies on offset to guarantee message delivery.

The code for consumer consumption is as follows:

Properties props = new Properties();
        props.put("bootstrap.servers"."Kafka01:9092, kafka02:9092");
        props.put("group.id"."test");
        props.put("enable.auto.commit"."true");
        props.put("auto.commit.interval.ms"."1000");
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        
        props.put("auto.offset.reset"."earliest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo"."bar"));
      try{  
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
         }
        }finally{
          consumer.close();
        }
Copy the code

One of the parameters is enable.auto.mit

If set to true consumer, commit shift before consumption is implemented at most once

If commit after consumption implements at least once this is the default configuration.

The default value of the kafka consumer parameter enable.auto.com MIT is true, so the default consumer level is at least once. Not exactly once.

Three, accurate once

The default configuration for Kafka is at least once. Kafka messages must be lost or repeated. The default configuration is at least once. Is there no way to be exactly once?

It is true that the producer side was not possible prior to Kafka 0.11.0.0, but after Kafka 0.11.0.0, IdemPotent Producer was made available.

Idempotent producers also support transactions.

Producer of idempotent

Kafka version 0.11.0.0 introduces the Idempotent Producer mechanism, in which the same message may be sent multiple times by the producer, but is written only once at the broker side, each message is de-coded, and has little impact on Kafka overhead.

How do you set it up? The new parameter enable.idempotent of the producer side needs to be set to true.

In the multi-partition case, we need to ensure atomicity of writes to multiple partitions, that is, messages written to multiple partitions are either all successful or all rolled back.

You need to use transactions to set transcational. Id to a specified string on the producer side.

In this way, idempotent producer can only ensure that there are no duplicate messages on a single partition. Transactions can guarantee the integrity of multi-partition write messages.

Figure transaction

So while the producer side implements exactly once, what about the consumer side?

Since the consumer side may not be able to consume all of the messages in the transaction and messages may be deleted, the transaction does not solve the exactly once problem on the consumer side, and we may need to handle this logic ourselves. For example, it is possible to implement exactly once if you manage the submission of offset by yourself and do not commit it automatically.

Another option is to use Kafka’s own streaming engine, Kafka Streams,

Set processing.guarantee=exactly_once to exactly once.

What is Kafka? Kafka monitor tool Kafka Fast start Consumer Kafka core Kafka core

For more blog posts on real-time computing,Flink,Kafka and other related technologies, welcome to real-time streaming computing