supplement

1. Summary of kafka

1. Message queues

1. Application scenarios of traditional message queues

2. Two modes of message queuing

  • Point-to-point mode:

1 to 1, consumers take the initiative to pull messages, messages received, messages cleared.The message producer produces the message and sends it to the Queue. Although it can correspond to one or more consumers, the message will be deleted after one consumer consumes it, and other consumers cannot consume it (A message corresponds only to one consumer, self-understanding)

  • Publish/subscribe

1 to many, the consumer does not clear the data after consuming the message.A message producer sends a message to a topic, which is consumed by multiple consumers (subscribers). (A message is published to a topic, and all groups that subscribe to the topic can receive or pull the message for consumption, self-understanding)

2. Definition of Kafka

Kafka is a publish/subscribe message queue that is mainly used in real-time big data processing.

3. Kafka infrastructure

  • To facilitate scaling and improve throughput, a topic is divided into multiple partitions
  • In coordination with the zoning design, the concept of consumption group is proposed, in which each consumer in the group consumes in parallel (A single message can only be consumed by one consumer within the group)
  • To improve availability, add several copies to each partition

  1. Producer: A message producer, a client that sends messages to kafka Broker
  2. Consumer: Message consumer, the client that fetches messages to Kafka Broker
  3. 2) Consumer group: a group of consumersConsumers in a consumer group are responsible for consuming messages from different partitions, and only one consumer can consume a partition, consumer groups do not affect each other, all consumers belong to a certain consumer group, that is, the consumer group is a logical subscriber.
  4. Broker: A Kafka server is a single broker. A cluster consists of multiple brokers. A single broker can hold many topics
  5. Topic: Can be understood as a queue,Producers and consumers are oriented to the same topic
  6. Partition: To achieve scalability, a large topic can be distributed among different brokers (Partitions distributed across different brokers make up this topic), each partition is an ordered queue
  7. Replica: In case of a cluster failure,Partition data on this node is not lost and Kafka works properly. Kafka provides a copy mechanismEach partition of a topic has several copies, one leader and several followers.
  8. Leader: The master of multiple replicas per partition, the producer sends data objects, and the consumer consumes data objects
  9. Followers: Synchronizes data from kafka in real time with the leader. If a leader fails, a follower becomes the leader.

2. Kafka installation is related

  1. The installation of kafka
  2. Kafka monitors installation

3. Kafka cli operations

1) View all topics in the current server

[mayi@mayi101 kafka]$ bin/kafka-topics.sh --zookeeper mayi101:2181/kafka --list
Copy the code

2) Create a topic

[mayi@mayi101 kafka]$ bin/kafka-topics.sh --zookeeper mayi:2181/kafka \ --create --replication-factor 3 --partitions 1 --topic defines the topic name --replication-factor defines the number of copies --partitions define the number of partitionsCopy the code

3) Delete topic

[mayi@mayi kafka]$ bin/kafka-topics.sh --zookeeper mayi101:2181/kafka \ --delete --topic first Delete.topic. enable=true is required in server.properties otherwise it just marks deletion.Copy the code

4) Send a message

[mayi@mayi101 kafka]$ bin/kafka-console-producer.sh \
--broker-list mayi101:9092 --topic first
>hello world
Copy the code

5) Consumer news

[mayi@mayi101 kafka]$ bin/kafka-console-consumer.sh \ --bootstrap-server mayi101:9092 --from-beginning --topic first [mayi@mayi101 kafka]$ bin/kafka-console-consumer.sh \ --bootstrap-server mayi101:9092 --from-beginning --topic first -- From-beginning: Reading all the previous data in the subject.Copy the code

6) Check the details of a Topic

[mayi@mayi101 kafka]$ bin/kafka-topics.sh --zookeeper mayi101:2181/kafka \
--describe --topic first
Copy the code

7) Change the number of partitions

[mayi@mayi101 kafka]$bin/kafka-topics.sh --zookeeper mayi101:2181/kafka --alter --topic first --partitions 6
Copy the code

4. Kafka deeply

Kafka in-depth understanding, can refer to

1. Kafka workflow

  • Messages in Kafka are categorized by topic, with producers producing messages and consumers consuming messages, both topic-oriented.
  • Topic is a logical concept, while partition is a physical concept. Each partition corresponds to a log file that stores data produced by producer. Data produced by the Producer is continuously appended to the end of the log file, and each data has its own offset.Each consumer in the consumer group will record in real time which offset they consumed, so that in case of error recovery, they can continue to consume from the last position.

2. Kafka file storage mechanism

Kafka storage reference

  • Because messages produced by producers are constantly appended to the end of the log file, Kafka prevents inefficient data location due to large log filesshardandThe indexMechanism to divide each partition into multiple segments. Each segment corresponds to two files — an.index file and a.log file. These files are located in a folder named topic name + partition number. For example, if first has three partitions, the corresponding folders are first-0,first-1, and first-2.
- rw - rw - r -. 1 mayi mayi 10485760 February 26 11:03 00000000000000000000. The index - rw - rw - r -. 1 mayi mayi 0 February 23" 00000000000000000000. The log - rw - rw - r -. 1 mayi mayi 10485756 February 26 11:03 00000000000000000000 timeindexCopy the code

Index and log file details

  • Index: The Index file is the Index file of the physical address of the message.
  • Log: indicates the actual message content.
  • Timeindex: This is the mapping timestamp and relative offset
  • Snapshot: Records the transaction information of producer
  • Leader-epoch-checkpoint: Saves the offset when each leader starts writing messages, which will be updated periodically.

5. Kafka producers

1. Partition policies

1) Reasons for partitioning

  • (1)Easy to scale in a clusterEach Partition can be adjusted to fit the machine on which it resides, and a topic can be composed of multiple partitions, so that the whole cluster can accommodate any size of data.
  • (2)Can improve concurrencyBecause you can read and write to partitions.

2) The principle of zoning

We need to encapsulate the data sent by the producer into a ProducerRecord object.

  • (1) When partition is specified, the specified value is directly used as partiton value;
  • (2) If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value;
  • (3) In the case that there is neither partition value nor key value, an integer is randomly generated in the first call (the integer is incremented in each subsequent call), and the partition value is obtained by modulating this value with the total number of partitions available for topic. It’s called the round Robin algorithm.

2. Data reliability assurance

In order to ensure that the data sent by producer can be reliably sent to the specified topic,After receiving the data sent by the producer, each partition of topic needs to send ack (Acknowledgement) to the producer.If the producer receives an ACK, it sends the data in the next round. Otherwise, it resends the data.

Copy synchronization policy: two options

ISR(Replica Synchronization Set)

  • After using the second solution, imagine the following scenario: the leader receives the data and all the followers start to synchronize the data. However, one follower fails to synchronize with the leader due to some fault. The leader has to wait until the synchronization is complete before sending an ACK. How to solve this problem?
  • The Leader maintains a dynamic In-sync Replica Set (ISR), which stands for andThe leader keeps a synchronized collection of followers. When the followers in the ISR complete the synchronization, the leader sends an ACK to the producer. If the follower does not synchronize data with the leader for a long time, the follower will be kicked out of the ISRreplica.lag.time.max.msParameter setting. When the Leader fails, a new Leader is elected from the ISR.

Ack response mechanism

Configuring acks Parameters

acks:

  • 0: The producer does not wait for the broker’s ACK. This operation provides a minimum delay for the broker to return as soon as it receives an ACK before writing to disk, which is possible if the broker failsLoss of data
  • 1: The producer waits for the ACK from the broker, and the partition’s leader returns an ACK after falling to the disk. If the leader fails before the follower synchronization succeeds, the partition will failLoss of data;

Acks :1 Data loss case

  • -1 (all) : The producer returns an ACK only after the broker and the partition’s leader and follower fall successfully. However, if the leader fails after the follower synchronization is complete and before the broker sends an ACK, this can causeData duplication.

Acks :-1 Data duplication cases

Summary: Duplication or data loss problems occur only when the Leader fails

  • Data loss:Response ack + data is not synchronized to follower => data is lost
  1. acks:0After the leader responds to the ACK successfully, the leader fails.Not synchronized to follower
  2. acks:1The leader responds to the ACK after the local disk is dropped.It's out of sync
  • Data duplication:Data synchronization + no ack response (producer sends ack again) => Data duplication
  1. acks:-1After the leader synchronizes the data,Unable to respond to the ACK, the leader failsAgain, the producer posts the data, and it repeats itself

Failure kafka handles details

HW and LEO in the log file

  • Followers of failure

If a follower is faulty, the follower is temporarily kicked out of the ISR. After the follower recovers, the follower reads the last HW recorded on the local disk, intercepts the log file that is higher than the HW, and synchronizes data from the HW to the leader. After the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the ISR again.

  • Leader failure

After the leader becomes faulty, a new leader is selected from the ISR. To ensure data consistency among multiple copies, the remaining followers cut off the log files whose values are higher than HW and then synchronize data from the new leader.

Note: This only guarantees data consistency between replicas, not data loss or duplication.

Exactly Once semantic

  • Setting the ACK level of the Server to -1 ensures that no data will be lost between Producer and Server. By contrast, setting the server ACK level to 0 ensures that the producer will send each message only Once (i.e. At Most Once).

  • At Least Once ensures that data is not lost, but not repeated. At Least Once, data cannot be lost. However, for some very important information, such as transaction data, downstream data consumers require that data be neither duplicated nor lost, the Exactly Once semantics. Prior to version 0.11, there was nothing Kafka could do about it except to ensure that data was not lost and that downstream consumers were globally de-duplicated. In the case of multiple downstream applications, each requires separate global de-weighting, which can have a significant impact on performance.

  • Kafka 0.11 introduced a major feature: idempotence. Idempotent means that no matter how many times Producer sends repeated data to the Server, the Server persists only one. Idempotence combined with At Least Once semantics forms Kafka’s Exactly Once semantics. That is:

  • At Least Once = Exactly Once

  • To enable idempotency, simply set enable.idompotence to true in the Producer argument. The idempotent implementation of Kafka essentially reloads the data upstream of what was previously required downstream. A Producer that enables idempotent is assigned a PID during initialization, and messages sent to the same Partition carry Sequence numbers. The Broker caches , and persists only one message when it is submitted with the same primary key. ,>

  • However, PID will change after restart, and different partitions also have different primary keys, so idempotent cannot guarantee Exactly Once across partitions and sessions.

Kafka consumers

Consumption patterns

  • The consumer uses pull mode to read data from the broker.
  • The push pattern is difficult to adapt to consumers with different consumption rates, because message sending rates are determined by the broker.The goal is to deliver messages as quickly as possible, but it is easy for consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the consumer’s ability to consume.
  • The downside of the pull pattern is that if Kafka has no data, the consumer may get stuck in a loop that keeps returning empty data.For this reason, Kafka consumers pass in a timeout when consuming data. If no data is currently available for consumption, the consumer will wait a certain amount of time before returning.

Partition allocation policy

There are multiple consumers in a consumer group and multiple partitions in a topic, so partition allocation is inevitably involved, that is, determining which consumer consumes that partition. Kafka has two allocation strategies: round-robin polling and range partitioning.

Offset the maintenance

  • Since a consumer may have power outages and other faults in the process of consumption, after recovery, it needs to continue consumption from the position before the fault, so it needs to record the offset to which it has consumed in real time, so that it can continue consumption after recovery.
  • Before version 0.9 of Kafka, consumers stored offsets in Zookeeper by default. Starting with version 0.9, consumers stored offsets in a built-in Kafka topic called __consumer_offsets by default.

Kafka reads and writes data efficiently

  1. Sequential reading and writing disks

Kafka’s producer produces data that is written to a log file. The process is written sequentially to the end of the file. Data from the official website shows that the same disk, sequential write can reach 600M/s, while random write only 100K /s. This has to do with the mechanics of the disk, and sequential writing is fast because it saves a lot of head addressing time.

  1. Application pagecache

Kafka data persistence is persisted directly to Pagecache, which has the following benefits:

  • The I/O Scheduler assembles sequential small blocks of writes into large physical writes to improve performance
  • The I/O Scheduler tries to reorder some writes to reduce disk head movement time
  • Make full use of all free memory (non-JVM memory). If you use application-layer Cache (that is, JVM heap memory), you add to the GC burden
  • Read operations can be performed directly in the Page Cache. If the consumption and production rates are comparable, data does not even need to be exchanged through physical disks (directly through the Page Cache)
  • If the process is restarted, the Cache in the JVM is invalidated, but the Page Cache is still available

Although persistence to Pagecache can cause downtime and loss of data, this can be addressed by Kafka’s Replication mechanism. Forcing data from the Page Cache to disk to ensure that data is not lost in this case can degrade performance.

  1. Zero copy technology

7. Zookeeper in Kafka

A single broker in a Kafka cluster is elected as a Controller, which manages the up-down of the cluster broker, the allocation of partition copies to all topics, and the election of the leader.

Controller management is dependent on Zookeeper.

8. Kafka transactions

Kafka has introduced transaction support since version 0.11. Transactions guarantee that Kafka can produce and consume across partitions and sessions with Exactly Once semantics, with either all success or all failure.

1. The producer

  • In order to implement transactions across partitions and sessions, a globally unique Transaction ID needs to be introduced and the PID obtained by Producer is bound to the Transaction ID. This allows the original PID to be retrieved from the ongoing Transaction ID when the Producer restarts.
  • To manage transactions, Kafka introduces a new component, the Transaction Coordinator. A Producer interacts with a Transaction Coordinator to obtain the task status corresponding to a Transaction ID. A Transaction Coordinator is also responsible for writing Transaction ownership to an internal Topic in Kafka, so that even if the entire service is restarted, the Transaction state is saved and the ongoing Transaction state can be restored to continue.

2. Consumer affairs

  • The above-mentioned transaction mechanism is mainly considered from the perspective of Producer. For consumers, the guarantee of transactions is relatively weak, especially when the Commit information cannot be accurately consumed. This is because a Consumer can access arbitrary information through offset, and different Segment files have different lifecycles. messages for the same transaction may be deleted after a restart.
  • If you want to complete the precise one-time consumption on the Consumer side, you need to bind the Kafka Consumer atomically to the consumption process and the submission offset process. In this case, we need to save kafka’s offset to a custom medium that supports transactions (such as mysql).

kafka API

producer API

Message Sending Process

Kafka’s Producer sends messages asynchronously. In the process of sending the message, there are two threads involved — the main thread and the Sender thread — and a thread shared variable — RecordAccumulator. The main thread sends messages to the RecordAccumulator, and the Sender thread continuously pulls messages from the RecordAccumulator to send to the Kafka Broker.

Kafka message sending processRelated parameters:

  • Batch. size: The sender will send data only after the data is accumulated to batch.size.
  • Linger. ms: If the data does not reach batch.size too late, the sender waits for Linger. time and then sends the data.

Asynchronous send

Import dependence

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>Against 2.4.1</version>
</dependency>
Copy the code

Write code:

  • KafkaProducer: You need to create a producer object that sends data
  • ProducerConfig: gets the required set of configuration parameters
  • ProducerRecord: Each piece of data is encapsulated into a ProducerRecord object

(1) No callback API

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");// Kafka cluster, broker-list
        props.put("acks"."all");
        props.put("retries".1);// Number of retries
        props.put("batch.size".16384);// Batch size
        props.put("linger.ms".1);// Wait time
        props.put("buffer.memory".33554432);//RecordAccumulator Buffer size
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))); } producer.close(); }}Copy the code

(2) Bring back the call function API

The callback function is called asynchronously when producer receives an ACK. The method has two parameters: RecordMetadata and Exception. If Exception is null, the message was successfully sent; if Exception is not null, the message failed to be sent.

Note: Message sending failures are automatically retried and do not require manual retries in the callback function.

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");// Kafka cluster, broker-list
        props.put("acks"."all");
        props.put("retries".1);// Number of retries
        props.put("batch.size".16384);// Batch size
        props.put("linger.ms".1);// Wait time
        props.put("buffer.memory".33554432);//RecordAccumulator Buffer size
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                // The callback function is called asynchronously when Producer receives an ACK
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else{ exception.printStackTrace(); }}}); } producer.close(); }}Copy the code

Synchronous send API

Synchronous sending means that after a message is sent, it blocks the current thread until an ACK is returned. Since the send method returns a Future object, we can implement synchronous sending by calling the Get method of the Future object.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");// Kafka cluster, broker-list
        props.put("acks"."all");
        props.put("retries".1);// Number of retries
        props.put("batch.size".16384);// Batch size
        props.put("linger.ms".1);// Wait time
        props.put("buffer.memory".33554432);//RecordAccumulator Buffer size
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get(); } producer.close(); }}Copy the code

consumer API

  • It’s easy to guarantee reliability when consuming data, because data is persisted in Kafka, so you don’t have to worry about data loss.
  • Since a consumer may have power outages and other faults in the process of consumption, after recovery, it needs to continue consumption from the position before the fault, so it needs to record the offset to which it has consumed in real time, so that it can continue consumption after recovery. soThe maintenance of offset is the Consumer consumption data must be considered.

Automatic submission of offset

Import dependence

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>Against 2.4.1</version>
</dependency>
Copy the code

Write the code

  • KafkaConsumer: You need to create a consumer object to consume data
  • ConsumerConfig: Gets the required set of configuration parameters
  • ConsuemrRecord: Each piece of data is encapsulated into a ConsumerRecord object

To enable us to focus on our business logic, Kafka provides automatic submission of offsets. Automatically submit parameters related to offset:

Enable.auto.com MIT: Specifies whether to enable the automatic submission of offset
Auto.mit.interval. ms: indicates the interval for automatically submitting offset
Copy the code

Automatically submit the code for offset

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");
        props.put("group.id"."test");
        props.put("enable.auto.commit"."true");
        props.put("auto.commit.interval.ms"."1000");
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}}Copy the code

Manually submitting offset

  • Although automatic submission of offsets is very convenient, it is difficult for developers to know when to commit offsets because it is time-based. So Kafka also provides an API for manually submitting offsets.
  • There are two methods for manually submitting offsetcommitSyncandCommitAsync (Async). The similarity between them is that the highest offset of a batch of data of this poll will be submitted. The difference is that commitSync blocks the current thread until a commit is successful, and automatically retries the failure (commit failures can also occur due to factors beyond your control). CommitAsync does not have a retry mechanism, so the commit may fail.

Synchronous commit code sample

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/ * * *@author liubo
 */
public class CustomComsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");/ / Kafka cluster
        props.put("group.id"."test");// If group. Id is the same, it belongs to the same consumer group
        props.put("enable.auto.commit"."false");// Disable automatic submission of offset
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));// Consumers subscribe to topics

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);// The consumer pulls data
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();The current thread will block until the offset submission is successful}}}Copy the code

Asynchronously submit offset

It is more reliable to commit offset synchronously, but because it blocks the current thread until the commit succeeds. Throughput is therefore significantly affected. Therefore, in more cases, asynchronous submission of offset is used

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/ * * *@author liubo
 */
public class CustomConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers"."hadoop102:9092");/ / Kafka cluster
        props.put("group.id"."test");// If group. Id is the same, it belongs to the same consumer group
        props.put("enable.auto.commit"."false");// Disable automatic submission of offset
        props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));// Consumers subscribe to topics

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);// The consumer pulls data
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if(exception ! =null) {
                        System.err.println("Commit failed for"+ offsets); }}});// Commit asynchronously}}}Copy the code

Data missing consumption and repeated consumption analysis

Whether the offset is submitted synchronously or asynchronously, data consumption may be missed or repeated. If offset is submitted before consumption, data consumption may be missed. However, consuming offset before submitting it may cause repeated consumption of data.