I. Consumers and consumer groups

In Kafka, consumers are usually part of a consumer group, and when multiple consumer groups read the same topic together, they do not affect each other. Kafka introduced the concept of consumer groups because Kafka consumers often do high-latency operations, such as writing data to a database or HDFS, or perform time-consuming calculations, where a single consumer can’t keep up with the data generation. At this point, you can add more consumers and let them share the load, processing partitioned messages separately, which is the main way Kafka achieves horizontal scaling.

It should be noted that the same partition can only be read by one consumer in the same consumer group. It is impossible for the same partition to be read by multiple consumers in the same consumer group, as shown in the figure:

As you can see, even though Consumer5 is idle, it doesn’t read any of the partitions, which reminds us that we should set the number of consumers appropriately to avoid idle and overhead.

Second, partition rebalancing

Because all the consumers in the group read the topic’s partitions together, when a consumer is shut down or crashes, it leaves the group and the partitions it read will be read by other consumers in the group. Redistribution of partitions and consumers also occurs when the topic changes, such as when new partitions are added, and ownership of partitions shifts from one consumer to another, a behavior known as rebalancing. It is because of rebalancing that consumer groups are able to maintain high availability and scalability.

Consumers maintain their affiliation with groups and their ownership of partitions by sending heartbeats to the group coordinator’s broker. As long as the consumer sends a heartbeat at a normal interval, it is considered active, indicating that it is still reading messages in the partition. The consumer sends a heartbeat when the polling message or the offset is submitted. If the consumer stops sending a heartbeat long enough, the session expires and the group coordinator thinks it is dead, triggering rebalancing.

Create a Kafka consumer

The following three options are required when creating a consumer:

  • Bootstrap. servers: Specifies a list of broker addresses. The list does not need to contain all broker addresses. The producer will look for broker information from the given broker. However, it is recommended to provide information from at least two brokers as fault tolerance;
  • Key. deserializer: Deserializer for the specified key.
  • Value. Deserializer: deserializer with a specified value.

In addition, you need to specify which topics you want to subscribe to, using the following two apis:

  • Consumer. Subscribe (Collection

    Topics) : Specifies the Collection of topics to subscribe to;
  • Consumer.subscribe (Pattern Pattern) : Uses regex to match the collection to be subscribed to.

Finally, you only need to periodically request data from the server through the polling API. Once the consumer has subscribed to the topic, the polling takes care of all the details, including group coordination, partition rebalancing, sending heartbeat, and fetching data, allowing the developer to focus only on the data returned from the partition and then do business processing. The following is an example:

String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers"."hadoop001:9092");
/* Specifies the group ID*/
props.put("group.id", group);
props.put("key.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer"."org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

/* Subscribe to topic (s)*/
consumer.subscribe(Collections.singletonList(topic));

try {
    while (true) {
        /* Polling for data */
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", record.topic(), record.partition(), record.key(), record.value(), record.offset()); }}}finally {
    consumer.close();
}
Copy the code

All of the sample code for this article can be downloaded from Github: Kafka-basis

3. Automatically submit offsets

3.1 The importance of offset

Every Kafka message has an offset property that records its position in the partition. The offset is a monotonically increasing integer. The consumer sends a message to a special topic called _consumer_offset that contains the offset for each partition. The offset is of little use if the consumer is always running. However, if a consumer exits or a new partition joins, then rebalancing is triggered. After rebalancing is complete, each consumer may be assigned to a new partition rather than the one previously processed. To be able to continue, the consumer needs to read the offset of the last commit for each partition and continue processing from where the offset is specified. For this reason, failure to commit the offset correctly can result in data loss or repeated consumption, such as the following:

  • If the committed offset is less than the offset of the last message processed by the client, then messages between the two offsets are consumed repeatedly;
  • If the submitted offset is greater than the offset of the last message processed by the client, messages between the two offsets will be lost.

3.2 Automatically submitting the offset

Kafka supports both automatic and manual offsets. Here’s the simple automatic submission:

You only need to set the enable.auto.com MIT attribute of the consumer to true to complete the automatic submission configuration. At fixed intervals, the consumer commits the maximum offset received by the poll() method. The commit interval is configured by the auto.commit.interval.ms property. The default value is 5s.

The pitfalls of using automatic commit are to assume that we use the default commit interval of 5s and rebalancing occurs 3s after the last commit, after which the consumer starts reading the message at the offset of the last commit. At this point the offset is 3s behind, so messages that arrive within 3s are processed repeatedly. You can modify the commit interval to commit offsets more frequently, reducing the window in which duplicate messages may occur, but this situation cannot be completely avoided. For this reason, Kafka also provides an API for submitting offsets manually, giving users more flexibility in submitting offsets.

4. Manually submit the offset

The user can manually submit the offset by setting enable.auto.com MIT to false. Manually submitted offsets based on user requirements can be divided into two categories:

  • Manually submit the current offset: indicates the maximum offset to manually submit the current poll.
  • Manually submit a fixed offset: Submit a fixed offset based on service requirements.

According to the Kafka API, manual commit offsets can be divided into synchronous commit and asynchronous commit.

4.1 Synchronous Submission

A synchronous commit is made by calling Consumer.com mitSync(), where the maximum offset for the current poll is submitted without passing any arguments.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /* Commit synchronously */
    consumer.commitSync();
}
Copy the code

If a commit fails, the synchronous commit is also retried, which ensures that the data is committed to maximum success, but also reduces the throughput of the program. For this reason, Kafka also provides an API for asynchronous commit.

4.2 Asynchronous Submission

Asynchronous commit improves the throughput of your application because you can request data without waiting for the Broker to respond. The code is as follows:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /* Commit asynchronously and define callback */
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if(exception ! =null) {
             System.out.println("Error handling");
             offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); }}}); }Copy the code

The problem with asynchronous commit is that there is no automatic retry when the commit fails, and indeed there is no automatic retry. If the program commits both 200 and 300 offsets, the 200 offsets fail, but the 300 offsets that follow succeed, and there is a chance that 200 will cover 300 if retried. Synchronous commit does not have this problem, because in the case of synchronous commit, the 300 commit request must wait for the server to return the success of the 200 commit request before it is sent. For this reason, in some cases, it is necessary to combine both synchronous and asynchronous commits.

Note: Although the program cannot automatically retry on failure, it is possible to manually retry. You can maintain the offsets for each partition you commit by using a Map<TopicPartition, Integer> offsets, and then, when it fails, You can determine if the failed offset is smaller than the last committed offset for the same topic and partition that you are maintaining. If it is smaller, it means that you have committed a larger offset request and do not need to retry. Otherwise, you can manually retry.

4.3 Synchronous and Asynchronous Submission

In this case, asynchronous commit is used in normal polling to ensure throughput, but because the consumer is about to be closed at the end, synchronous commit is needed to ensure maximum commit success.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
        }
        // Commit asynchronouslyconsumer.commitAsync(); }}catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        // Because the consumer is about to be closed, synchronous commit is required to ensure a successful commit
        consumer.commitSync();
    } finally{ consumer.close(); }}Copy the code

4.4 Commit a specific offset

In both the synchronous and asynchronous commit apis above, we don’t actually pass any arguments to the commit method. The default commit is the maximum offset for the current poll. If you need to commit a specific offset, you can call their overloaded methods.

/* Synchronously commit the specified offset */
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
/* Commit a specific offset asynchronously */    
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
Copy the code

Note that because you can subscribe to multiple topics, the offsets must contain each partition offsets for all topics. The example code looks like this:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            /* Record the offset for each partition for each topic */
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1."no metaData");
            /*TopicPartition overwrites hashCode and equals to ensure that instances of the same topic and partition are not added repeatedly */
            offsets.put(topicPartition, offsetAndMetadata);
        }
        /* Commit a specific offset */
        consumer.commitAsync(offsets, null); }}finally {
    consumer.close();
}
Copy the code

Five, monitor partition rebalancing

Because partition rebalancing leads to repartitioning of partitions and consumers, sometimes you may want to perform actions before rebalancing: commit offsets that have been processed but not committed, close database connections, etc. When subscribing to a topic, you can call subscribe’s overloaded method and pass in a custom partition rebalancing listener.

 /* Subscribe to all topics in the specified collection */
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
 /* Use the regex to match the subject */ to which you need to subscribe    
subscribe(Pattern pattern, ConsumerRebalanceListener listener)    
Copy the code

The following is a code example:

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    /* This method is called after the consumer stops reading messages, but before rebalancing begins
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Rebalancing is about to trigger.");
        // Submit the offset that has been processed
        consumer.commitSync(offsets);
    }

    /* This method is called after the partition is reallocated, but before the consumer starts reading messages */
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1."no metaData");
            /*TopicPartition overwrites hashCode and equals to ensure that instances of the same topic and partition are not added repeatedly */
            offsets.put(topicPartition, offsetAndMetadata);
        }
        consumer.commitAsync(offsets, null); }}finally {
    consumer.close();
}
Copy the code

6. Exit the polling

Kafka provides a consumer.wakeup() method to exit the poll, which breaks out of the loop by throwing WakeupException. Note that it is best to call consumer.close() when exiting the thread, at which point the consumer commits anything that hasn’t already committed, sends a message to the group coordinator that he or she is leaving the group, and then triggers rebalancing without waiting for the session to timeout.

The following example code listens for console output, ends polling when exit is entered, closes the consumer, and exits the program:

/* Call wakeup to exit gracefully */
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (sc.hasNext()) {
        if ("exit".equals(sc.next())) {
            consumer.wakeup();
            try {
                /* Wait for the main thread to complete the commit offset, close the consumer, etc. */
                mainThread.join();
                break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}).start();

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> rd : records) {
            System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset()); }}}catch (WakeupException e) {
    Wakeup () is a wakeup() call that causes an exception
} finally {
    consumer.close();
    System.out.println("Consumer shut down");
}
Copy the code

7. Independent consumers

Because Kafka is designed for high throughput and low latency, consumers in Kafka are often subordinated to a group because the processing power of a single consumer is limited. But sometimes you can demand is very simple, such as may only need a consumer from a subject all read the data partition or a particular partition, this time don’t need to consumer groups, and balance again, only need to put a theme or partitions assigned to consumers, and began to read messages submitted the offset well.

In this case, there is no need to subscribe to the topic, and instead the consumer allocates partitions for himself. A consumer can subscribe to a topic (well join a consumer group) or partition itself, but cannot do both. The example code for assigning partitions is as follows:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

/* You can specify which partitions to read. For example, suppose only partition 0 of the topic is read */
for (PartitionInfo partition : partitionInfos) {
    if (partition.partition()==0){
        partitions.add(newTopicPartition(partition.topic(), partition.partition())); }}// Specify partitions for consumers
consumer.assign(partitions);


while (true) {
    ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<Integer, String> record : records) {
        System.out.printf("partition = %s, key = %d, value = %s\n",
                          record.partition(), record.key(), record.value());
    }
    consumer.commitSync();
}
Copy the code

Appendix: Kafka consumer optional properties

1. fetch.min.byte

The minimum number of bytes that a consumer retrieves a record from the server. If the amount of data available is less than the set value, the broker waits until enough data is available before returning it to the consumer.

2. fetch.max.wait.ms

The wait time for the broker to return data to the consumer. The default is 500ms.

3. max.partition.fetch.bytes

This property specifies the maximum number of bytes the server can return to the consumer from each partition, which defaults to 1MB.

4. session.timeout.ms

The amount of time a consumer can disconnect from a server before it is presumed to be dead. The default is 3s.

5. auto.offset.reset

This property specifies what the consumer should do if it reads a partition with no offset or if the offset is invalid:

  • Latest (default) : If the offset is invalid, the consumer will start reading data from the latest record (the latest record generated after the consumer started);
  • Earliest: A consumer will read the partition’s records from the starting position if the offset is invalid.

6. enable.auto.commit

Whether to automatically commit offsets. The default is true. To avoid duplicate consumption and data loss, you can set it to false.

7. client.id

The client ID that the server uses to identify the source of the message.

8. max.poll.records

The number of records returned by a single call to the poll() method.

9. receive.buffer.bytes & send.buffer.byte

The two parameters specify the size of the TCP socket buffer for receiving and sending packets, respectively. -1 indicates that the default value of the operating system is used.

The resources

  1. Neha Narkhede, Gwen Shapira,Todd Palino. An authoritative guide to Kafka. People’s Posts and Telecommunications Press. 2017-12-26

More articles in the big Data series can be found in the GitHub Open Source Project: Getting Started with Big Data