Message loss scenario

If Kafka Producer sends a message using the “send and forget” method, which calls the producer.send(MSG) method, the method returns immediately, but it does not indicate that the message has been successfully sent. See first encounter with the Kafka producer for how messages are sent.

If network jitter occurs during a message, the message is lost. Messages may be sent that do not meet requirements, such as being larger than the Broker can handle. (The problem of messages being too large has actually been encountered in production, and this problem is solved by subcontracting messages before they are sent and then sending them in sequence.)

The solution to this problem is that the Producer sends messages using a method called producer.send(MSG, callback). The callback method tells us if the message was actually submitted successfully, and in the event of a message delivery failure, the code can be used for fault tolerance and remediation.

For example, if a message is lost due to network jitter, the Producer can try again. If the message is unqualified, modify the message format and send it again. Producer uses the message sending API of callback to discover whether messages fail to be sent in time and process them accordingly. Add my wechat, get more Java materials, notes, source code, and more kafka topics interview materials. Reply to Kafka for interview documentation.

Consumer data loss

The loss of data on the Consumer side mainly occurs when a message is pulled and a consumption shift is committed, but a failure such as downtime occurs suddenly before the message processing is completed. After the consumer is reborn, the consumer will resume consuming from the next position of the previously committed shift, and the previously unfinished message will not be processed again, i.e. the consumer has lost the message.

The solution for missing messages on the Consumer side is also simple: change the timing of the shift commit to confirm that the Consumer has completed a batch of messages after the message processing is complete and then commit the corresponding shift. This way, even if an exception occurs during the processing of the message, since there is no committed shift, the message will be pulled again from the last shift on the next consumption without message loss.

The specific implementation method is that when the Consumer consumes the message, the automatic submission shift is turned off and the application manually submits the shift.

The Broker lost data

The loss of data at the Broker can occur in the following cases:

On the previous unclean Broker, it is effectively used to elect a new Leader. In this case, the messages on the previous unclean Broker will be lost. We can effectively ban its election as the Leader.

Kafka uses page caching to write messages to the page cache rather than persisting them directly to disk, and leaves flushing to the operating system to schedule for high efficiency and throughput. If a part of the message is still in the memory page and is not persisted to disk, the Broker will break down. After the restart, the part of the message will be lost. The multi-copy mechanism can prevent the loss of the message.

Best practices for avoiding message loss

Instead of using producer.send(MSG), use the producer.send(MSG, callback) method.

Set acks = all. The acks parameter is a parameter of Producer and represents the definition of “committed” messages. If set to all, it means that all brokers must receive the message before it is “committed”, which is the highest level of “committed”.

Set retries to a larger value. Retries represents the number of retries after Producer fails to send a message. If a transient fault such as network jitter occurs, the message can be sent again through a retry mechanism to avoid message loss.

Set the unclean. Leader. Election. The enable = false. This is a broker-side parameter that indicates which brokers are eligible to run as the Leader of a partition. If a Follower Broker that is too far behind the Leader becomes the new Leader, messages will inevitably be lost. Therefore, set this parameter to false to prevent this from happening.

Replication. factor >= 3. The number of copies per partition is greater than or equal to 3. Redundancy is used to prevent message loss.

Set min.insync.replicas > 1. The Broker side parameter, which controls how many copies a message is written to, is “committed”. Setting this parameter to a value greater than 1 improves message durability.

Make sure replica.factor > min.insync.replicas. If they are equal, then if one of the replicas hangs, the entire partition will not work properly. Replication. factor = min.insync.replicas + 1; replicas = replicas + 1.

Set enable.auto.mit to fasle on the Consumer side to disable automatic shift submission and use manual shift submission.

Precise one-time consumption

Kafka currently provides “at least once” message reliability by default, meaning messages are never lost. As we saw in the previous section, if the Producer fails to send a message, he can try again. If the Broker fails to send a response to the Producer (such as network jitter), the Producer will try again to send the original message. This is why Kafka provides messages at least once by default, although this can cause messages to be sent repeatedly.

To ensure that messages are consumed “at most once,” Producer retries are prohibited. But failed write messages are lost forever if they are not retried. Is there any other way to ensure that messages are sent without loss or repeated consumption? In other words, even if the Producer sends some messages repeatedly, the Broker can automatically undo them.

Kafka actually ensures that message consumption is accurate once through two mechanisms:

Idempotence

Transaction

idempotence

Idempotent simply means that multiple calls to an interface produce the same result as one call. In Kafka, Producer is not idempotent by default. Kafka introduced this feature in 0.11.0.0. Setting the enable.idempotence parameter to true specifies the idempotency of Producer. When idempotent producers are enabled, Kafka automatically de-sends messages. In order to realize producer idempotency, Kafka introduces the concepts of producer ID (PID) and sequence number.

When a producer instance is created, it is assigned a PID that is completely transparent to the user. For each PID, each partition to which a message is sent has a corresponding serial number that increases monotonically from zero. Each time the producer sends a message, it increments the sequence number corresponding to **<PID, partition >** by one.

The Broker side maintains a sequence number SN_old in memory for each pair of <PID, partition >. For each message sent by the producer, the serial number SN_new is judged and processed accordingly. Add my wechat, get more Java materials, notes, source code, and more kafka topics interview materials. Reply to Kafka for interview documentation.

The broker accepts the message only if SN_new is 1 greater than SN_old, that is, if SN_new = SN_old + 1.

SN_new < SN_old + 1, indicating that the message was written repeatedly. The broker discarded the message.

SN_new > SN_old + 1, there are data has not been written, among the message sequence, there may be a message loss phenomenon, the corresponding producers sell OutOfOrderSequenceException.

Note: The sequence number is for <PID, partition >, which means that idempotent producers can only guarantee that messages are not repeated within a single partition for a single topic; Secondly, it can only realize idempotency on a single session, but cannot realize idempotency across sessions, which can be understood as a run of the Producer process. When the Producer process is restarted, the idempotent guarantee fails.

The transaction

Idempotence does not work across multiple partitions, and Kafka transactions compensate for this. Kafka has support for transactions since version 0.11, primarily at the Read Committed isolation level. It ensures that multiple messages are atomically written to the target partition and that boheng consumers only see messages that were successfully committed by the transaction.

The Producer side configuration

Transactional Producer ensures that messages are written atomically to multiple partitions. Batch messages are either written successfully or fail. Moreover, after a transactional Producer is restarted, Kafka guarantees that the messages it sends are processed exactly once. Enabling transactional Producer configuration is as follows:

As with idempotent Producer, enable. Idempotence = true.

Set transcational. Id on the Producer end. It’s best to give it a meaningful name. Add my wechat, get more Java materials, notes, source code, and more kafka topics interview materials. Reply to Kafka for interview documentation.

A Producer that is set to be transactional can call some transactional apis, as follows: InitTransaction, beginTransaction, commitTransaction, and abortTransaction correspond to transaction initialization, transaction start, transaction commit, and transaction termination respectively.

producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaExecption e) { producer.abortTransaction(); }Copy the code
In the code above, transactional Producer ensures that either the record D1 and Record D2 commit successfully or all writes fail. In fact, Kafka writes the messages to the underlying log even if the writes fail. That is, the messages are still visible to the Consumer, which needs to be configured to read messages sent by the transactional Producer.

Consumer side configuration

When reading messages sent by a transactional Producer, the Isolation. level parameter on the Consumer side represents the isolation level of the transaction, which determines the level at which the Consumer reads the messages. The value can be:
Read_uncommitted: The default value, in which a Consumer can read any message Kafka writes, regardless of whether or not the transactional Producer has committed a transaction. Obviously, if a transactional Producer is enabled, this value should not be used for the Consumer side argument, otherwise the transaction will be invalid.
Read_committed: Apparently a Consumer can only read messages written by a transactional Producer in a transaction that was successfully committed, but any messages written by a non-transactional Producer are visible to the Consumer.

conclusion

Kafka provides two ways of consuming messages exactly once: idempotent Producer and transactional Producer.
Idempotent Producer can only ensure message idempotent on a single session or partition.
Transactional Producer can ensure idempotency across partitions and sessions.
Transactional Producer is more powerful, but at the same time, inefficient.