Kafka message delivery semantics – messages are not lost, not repeated, not lost and not heavy.

directory

  • 1. Kafka message delivery semantics – messages are not lost, repeated, lost or duplicated
    • 1.1. Introduction
    • 1.2. Producer Indicates the message Producer
    • 1.3. Broker Message receiver
    • 1.4. Consumer message Consumer side
    • 1.5. Exactly Once
      • 1.5.1. Guarantee of message idempotency at the Producer end
      • 1.5.2. Transactional Guarantees
      • 1.5.3. The Consumer end
    • 1.6. Reference materials

Kafka message delivery semantics – Messages are not lost, not repeated, not lost or regenerated

introduce

Kafka supports three message delivery semantics:

  • At most once — a message may be lost, but not repeated
  • At least once — messages are not lost and may be repeated
  • Exactly once — a message is not lost or repeated, and is consumed only once.

However, the overall message delivery semantics are guaranteed by both the Producer side and the Consumer side.

Producer Indicates the message Producer

An example scenario is that when producer sends a message to the broker, the network fails. The producer cannot tell whether the message has been received. The network error can occur either during message delivery or when the broker has received the message and ack back to the producer.

In this case, the producer can only resend the message. The message may be repeated, but at least once is guaranteed.

Version 0.11.0 achieves exactly once on the producer side by giving each producer a unique ID and generating a sequence num in each message.

There are also the acks Settings on the producer side, the number of replicas on the broker side, and the min.insync.replicas Settings. For example, the acks Settings on the producer end are as follows: Acks =0, min.insync.replicas=2, acks=0, min.insync.replicas=2, If the number of synchronized replicas in INSRNC is less than 2, an exception will be reported. If no two replicas are written successfully, an exception will also be reported, and the message will be considered as not written successfully.

Broker The receiver of a message

As mentioned above, acks=1 means that when the leader shard copy writes the message successfully, it returns a response to the producer, and the message is considered to have been sent successfully. If leader write successful single hung up right away, not to the success of this write synchronization to other copy of fragmentation, the subdivision of ISR list is empty, if unclean. Leader. Election. Enable = true, Log truncation occurs and message loss also occurs. If unclean. Leader. Election. Enable = false, then the shards of service is not available, the producer to the shard message will throw exceptions.

So we set min. Insync. Replicas = 2, unclean. Leader. Election. Enable = false, the producer side acks = all, so send successful message can never be lost.

Consumer message Consumer side

All shard copies have their own log file (which holds messages) and the same offset value. When the consumer does not hang, the offset is stored directly in memory. If it does hang, load balancing occurs, requiring other consumers in the Consumer group to take over and continue consuming.

Consumer can consume messages in two ways;

  1. The Consumer reads the message, saves the offset, and processes the message. Now assume a scenario where saving the offset succeeds, but message processing fails, and the consumer hangs again. In this case, the consumer will have to continue consuming from the last saved offset. In this case, messages may be lost, but the at most once semantics are preserved.

  2. The consumer reads the message, processes the message, processes it successfully, and saves the offset. If the message is processed successfully, but the consumer hangs while saving the offset, the consumer can only consume the offset from the last saved one. In this case, the message will be repeatedly consumed, which ensures the at least once semantics.

None of these guarantees can be addressed directly by a configuration, but by your consumer code. It’s just a matter of sequencing. The first corresponding code:

List<String> messages = consumer.poll();
consumer.commitOffset();
processMsg(messages);
Copy the code

The second corresponding code:

List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();
Copy the code

Exactly Once implementation principle

Here’s how exactly Once works.

Message idempotency guarantee at the Producer end

Each Producer is assigned a unique PID during initialization. The messages sent by the Producer to the specific Partition of the specified Topic carry a sequence number (seqNum for short) that increases monotonically from zero.

The Broker maintains the seqNum corresponding to the topicpartition in memory and verifies each message received from Producer. SeqNum is considered valid only if it is just larger than the seqNum submitted last time. If the value is larger than this value, the message is lost. Anything smaller than this indicates that the message was sent repeatedly.

The above is only for the case of a single Producer in a session. If the Producer dies and then restarts, another Producer is assigned another PID, the purpose of preventing duplication cannot be achieved. So Kafka introduced Transactional Guarantees.

Transactional Guarantees

Kafka’s transactional guarantee states that a message is sent to multiple TopicPartitions at the same time with either success or failure.

Why is this thing out there? I think it may be such an example: after the user booked an air ticket, the status of the order changed and the seat was occupied, which is equivalent to two messages, so the transaction is guaranteed to be: Sending a message to the order status Topic and the airplane seat Topic requires this transactional guarantee from Kafka.

This function enables the submission of the Consumer offset (which also generates messages to the broker) to be bound to messages sent by producer. The user needs to provide a unique global TransactionalId so that the PID and TransactionalId can be mapped together. In this way, the problem of cross-sessions after the producer fails can be solved. Assign the TransactionalId of the previous PID to the new producer.

The Consumer end

The above transactional guarantee is only for the producer end and cannot be guaranteed for the consumer end for the following reasons:

  1. Compacted topics. Some transaction messages may be overwritten by a new version of producer
  2. A transaction may span two log segments, at which point the old segments may be deleted and messages will be lost
  3. The consumer may address any point in the transaction and also lose some of the initialized messages
  4. Consumers may not simultaneously consume messages from all participating TopicPartitions shards

If you consume a topic in Kafka and write the result back to another topic in Kafka, you can bind the saving of the result after the message processing to the saving of the offset as a transaction, which guarantees that both the processing of the message and the submission of the offset will either succeed or fail.

If you want to save the result of processing the message to the external system, then tow-phase commit is required. However, this is very troublesome. The better way is to manage the offset itself, save it in the same place as the result of the message, and bind it as a whole. See the HDFS example in Kafka Connect.

References and related readings

Message Delivery Semantics KIP-98-exactly Once Delivery and Transactional Messaging Kafka Connect Details Details